Streams: o guia definitivo

Saiba como usar streams legíveis, graváveis e de transformação com a API Streams.

Com a API Streams, é possível acessar programaticamente fluxos de dados recebidos pela rede ou criados por qualquer meio local e processá-las com JavaScript. Streaming envolve dividir um recurso que você quer receber, enviar ou transformar em pedaços pequenos e depois processá-los bit a bit. O streaming é algo os navegadores fazem ao receber recursos como HTML ou vídeos para exibição em páginas da Web, O recurso nunca esteve disponível para JavaScript antes do lançamento do fetch, com streams, em 2015.

Antes, se você quisesse processar algum tipo de recurso (seja um vídeo, um arquivo de texto etc.), você teria que baixar o arquivo inteiro, esperar que ele seja desserializado em um formato adequado, e processá-la. Com as transmissões disponíveis para JavaScript, tudo isso muda. Agora você pode processar dados brutos com JavaScript progressivamente conforme assim que estiverem disponíveis no cliente, sem a necessidade de gerar um buffer, uma string ou um blob. Isso libera vários casos de uso, como os listados abaixo:

  • Efeitos de vídeo: canalização de um stream de vídeo legível por um stream de transformação que aplica efeitos em tempo real.
  • (des)compressão de dados: canalização de um stream de arquivos por um stream de transformação que seleciona (des)compacta-o.
  • Decodificação de imagem: canalização de um fluxo de resposta HTTP por um fluxo de transformação que decodifica bytes em dados de bitmap e por outro fluxo de transformação que traduz bitmaps em PNGs. Se instalado dentro do gerenciador fetch de um service worker, isso permite aplicar o polyfill de maneira transparente em novos formatos de imagem, como AVIF.

Suporte ao navegador

ReadableStream e WritableStream

Compatibilidade com navegadores

  • Chrome: 43.
  • Borda: 14.
  • Firefox: 65.
  • Safari: 10.1.

Origem

TransformStream

Compatibilidade com navegadores

  • Chrome: 67.
  • Borda: 79.
  • Firefox: 102
  • Safari: 14.1.

Origem

Principais conceitos

Antes de entrar em detalhes sobre os vários tipos de fluxo, vou apresentar alguns conceitos básicos.

Pedaços

Um bloco é um único dado gravado ou lido em um stream. Pode ser de qualquer type podem até conter blocos de diferentes tipos. Na maioria das vezes, um bloco não será o mais atômico unidade de dados de um determinado fluxo. Por exemplo, um stream de bytes pode conter blocos compostos de 16 Unidades Uint8Array de KiB, em vez de bytes únicos.

Streams legíveis

Um stream legível representa uma fonte de dados que podem ser lidos. Em outras palavras, os dados vêm fora de um stream legível. Concretamente, um stream legível é uma instância do ReadableStream .

Streams graváveis

Um stream gravável representa um destino para dados em que é possível gravar. Em outras palavras, os dados entra em um stream gravável. Concretamente, um stream gravável é uma instância do WritableStream.

Streams de transformação

Um stream de transformação consiste em um par de streams: um stream gravável, conhecido como lado gravável, e um stream legível, conhecido como lado legível. Uma metáfora do mundo real para isso seria intérprete simultâneo que traduz de um idioma para outro instantaneamente. De maneira específica para o fluxo de transformação, gravar no lado gravável resulta na disponibilização de novos dados para leitura do no lado legível. Concretamente, qualquer objeto com propriedades writable e readable pode ser veiculado como um fluxo de transformação. No entanto, a classe TransformStream padrão facilita a criação um par que fique corretamente emaranhado.

Correntes para tubos

Os streams são usados principalmente encadeando uns aos outros. É possível encadear um stream legível para um stream gravável, usando o método pipeTo() do stream legível, ou pode ser canalizado por um ou mais streams de transformação primeiro, usando o método pipeThrough() do stream legível. Um conjunto de os fluxos juntos dessa forma são chamados de "cadeia de pipelines".

Contrapressão

Uma vez que uma cadeia de canos é construída, ela propagará sinais sobre a rapidez com que os blocos devem fluir por ele. Se alguma etapa da cadeia ainda não puder aceitar partes, ela propaga um sinal para trás pela cadeia do {i>pipe<i}, até que, eventualmente, a fonte original seja instruída a parar de produzir blocos rápido. Esse processo de normalização do fluxo é chamado de contrapressão.

Teeing

Um stream legível pode ser vinculado (nomeado de acordo com a forma de um "T" maiúsculo) usando o método tee(). Isso bloqueia o stream, ou seja, faz com que ele não seja mais diretamente utilizável. No entanto, isso criará duas novas streams, chamados de ramificações, que podem ser consumidos de maneira independente. O Teeing também é importante porque os streams não podem ser retrocedidos ou reiniciados. Falaremos mais sobre isso depois.

Diagrama de uma cadeia de canos que consiste em um fluxo legível proveniente de uma chamada para a API de busca, que é encadeado por um fluxo de transformação com saída e enviada ao navegador para o primeiro fluxo legível resultante e ao cache do service worker para o segundo fluxo legível resultante.
Uma cadeia de cano.

a mecânica de um stream legível;

Um stream legível é uma fonte de dados representada em JavaScript por uma O objeto ReadableStream que flui de uma fonte subjacente. A ReadableStream() O construtor cria e retorna um objeto de fluxo legível dos manipuladores fornecidos. Existem duas tipos de origem:

  • As fontes push enviam dados constantemente quando você as acessa e cabe a você iniciar, pausar ou cancelar o acesso à transmissão. Os exemplos incluem streams de vídeo ao vivo, eventos enviados pelo servidor, ou WebSockets.
  • As origens pull exigem que você solicite dados explicitamente a partir delas depois de conectadas. Exemplos incluir operações HTTP usando chamadas fetch() ou XMLHttpRequest.

Os dados de stream são lidos sequencialmente em pequenos pedaços chamados blocos. Os blocos colocados em um stream são considerados enfileirados. Isso significa que eles estão esperando em uma fila prontos para serem lidos. Uma fila interna monitora os blocos que ainda não foram lidos.

Uma estratégia de enfileiramento é um objeto que determina como um stream deve sinalizar a contrapressão com base em o estado da fila interna dele. A estratégia de enfileiramento atribui um tamanho a cada bloco e compara o tamanho total de todos os fragmentos na fila para um número especificado, conhecido como marca alta.

Os blocos dentro do stream são lidos por um leitor. Este leitor recupera os dados um bloco por vez o que permite realizar o tipo de operação que você quiser. O leitor e o outro que o acompanha é chamado de consumidor.

A próxima construção nesse contexto é chamada de controlador. Cada stream legível tem um ID que, como o nome sugere, permite controlar a transmissão.

Somente um leitor pode ler um stream por vez. quando um leitor é criado e começa a ler um stream (ou seja, se torna um leitor ativo), ele fica bloqueado a ele. Se você quer que outro leitor assuma lendo sua transmissão, você normalmente precisa liberar o primeiro leitor antes de fazer qualquer outra coisa (embora você possa tee transmissões).

Como criar um stream legível

Você cria um stream legível chamando o construtor ReadableStream() O construtor tem um argumento opcional underlyingSource, que representa um objeto com métodos e propriedades que definem como a instância de stream construída se comportará.

O underlyingSource

Isso pode usar os seguintes métodos opcionais definidos pelo desenvolvedor:

  • start(controller): chamado imediatamente quando o objeto é construído. A pode acessar a origem do stream e fazer o que mais for necessário necessárias para configurar a funcionalidade de transmissão. Se esse processo for realizado de modo assíncrono, o método poderá retornam uma promessa para sinalizar o sucesso ou a falha. O parâmetro controller transmitido a esse método é por ReadableStreamDefaultController.
  • pull(controller): pode ser usado para controlar o stream à medida que mais partes são buscadas. Ela é chamado repetidamente desde que a fila interna de blocos do stream não esteja cheia, até que a fila atinge sua marca-d'água a mais. Se o resultado da chamada de pull() for uma promessa, pull() não vai ser chamado novamente até que essa promessa seja atendida. Se a promessa for rejeitada, ocorrerá um erro no stream.
  • cancel(reason): chamado quando o consumidor de stream cancela o stream.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

O ReadableStreamDefaultController é compatível com os seguintes métodos:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

O queuingStrategy

O segundo argumento, também opcional, do construtor ReadableStream() é queuingStrategy. É um objeto que, opcionalmente, define uma estratégia de enfileiramento para o stream, que usa dois parâmetros:

  • highWaterMark: um número não negativo que indica a marca d'água alta do fluxo usando essa estratégia de enfileiramento.
  • size(chunk): uma função que calcula e retorna o tamanho finito não negativo do valor do bloco especificado. O resultado é usado para determinar a pressão de retorno, que se manifesta usando a propriedade ReadableStreamDefaultController.desiredSize adequada. Ele também determina quando o método pull() da origem é chamado.
.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Os métodos getReader() e read()

Para fazer a leitura de um stream legível, você precisa de um leitor, que será um ReadableStreamDefaultReader O método getReader() da interface ReadableStream cria um leitor e bloqueia o stream para reimplantá-lo. Enquanto o stream estiver bloqueado, nenhum outro leitor poderá ser adquirido até que este seja liberado.

O read() da interface ReadableStreamDefaultReader retorna uma promessa que fornece acesso ao próximo bloco na fila interna do stream. Ela é atendida ou rejeitada com um resultado, dependendo do estado do no fluxo. As diferentes possibilidades são as seguintes:

  • Se um bloco estiver disponível, a promessa será atendida com um objeto do formulário
    { value: chunk, done: false }.
  • Se o stream for fechado, a promessa será cumprida com um objeto do formulário
    { value: undefined, done: true }.
  • Se o stream gerar um erro, a promessa vai ser rejeitada com o erro relevante.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

A propriedade locked

É possível verificar se um stream legível está bloqueado acessando o ReadableStream.locked .

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Amostras de código de stream legíveis

O exemplo de código abaixo mostra todas as etapas em ação. Primeiro, você cria um ReadableStream que, O argumento underlyingSource (ou seja, a classe TimestampSource) define um método start(). Esse método instrui o controller do stream a enqueue() um carimbo de data/hora a cada segundo durante dez segundos. Por fim, ele instrui o controlador a usar close() no stream. Você consumir isto criando um leitor pelo método getReader() e chamando read() até que o stream seja done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iteração assíncrona

Verificar cada iteração de loop read() se o stream é done pode não ser a API mais conveniente. Felizmente, em breve haverá uma maneira melhor de fazer isso: a iteração assíncrona.

for await (const chunk of stream) {
  console.log(chunk);
}

Uma solução alternativa para usar a iteração assíncrona hoje é implementar o comportamento com um polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Como estabelecer um stream legível

O método tee() do A interface ReadableStream faz o tee do fluxo legível atual, retornando uma matriz de dois elementos contendo as duas ramificações resultantes como novas instâncias ReadableStream. Isso permite dois leitores ler um stream simultaneamente. Você pode fazer isso, por exemplo, em um service worker se você deseja obter uma resposta do servidor e transmiti-la para o navegador, mas também transmiti-la para o cache de service worker. Como um corpo de resposta não pode ser consumido mais de uma vez, são necessárias duas cópias para fazer isso. Para cancelar o fluxo, é necessário cancelar as duas ramificações resultantes. Tendo em um stream geralmente o bloqueará pela duração, impedindo que outros leitores o bloqueiem.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Streams de bytes legíveis

Para streams que representam bytes, uma versão estendida do stream legível é fornecida para tratar com eficiência, especialmente minimizando cópias. Os streams de bytes permitem que você traga seu próprio buffer (BYOB) a serem adquiridos. A implementação padrão pode fornecer vários resultados diferentes, como como strings ou buffers de matriz no caso de WebSockets, enquanto os streams de bytes garantem a saída de bytes. Além disso, os leitores BYOB têm benefícios de estabilidade. Isso é porque, se um buffer for removido, isso pode garantir que um não seja gravado no mesmo buffer duas vezes, portanto, evitando condições de corrida. Os leitores de BYOB podem reduzir o número de vezes que o navegador precisa ser executado coleta de lixo, porque ela pode reutilizar buffers.

Como criar um fluxo de bytes legível

É possível criar um fluxo de bytes legível transmitindo um parâmetro type extra para o construtor ReadableStream().

new ReadableStream({ type: 'bytes' });

O underlyingSource

A origem de um stream de bytes legível recebe um ReadableByteStreamController para manipular. O método ReadableByteStreamController.enqueue() usa um argumento chunk cujo valor é um ArrayBufferView. A propriedade ReadableByteStreamController.byobRequest retorna o valor Solicitação de envio BYOB ou nula se não houver nenhuma. Por fim, o método ReadableByteStreamController.desiredSize retorna o tamanho desejado para preencher a fila interna do fluxo controlado.

O queuingStrategy

O segundo argumento, também opcional, do construtor ReadableStream() é queuingStrategy. É um objeto que, opcionalmente, define uma estratégia de enfileiramento para o stream, que pega um :

  • highWaterMark: um número não negativo de bytes que indica a marca d'água alta do fluxo usando essa estratégia de enfileiramento. Ele é usado para determinar a pressão de retorno, que se manifesta usando a propriedade ReadableByteStreamController.desiredSize adequada. Ele também determina quando o método pull() da origem é chamado.
. .

Os métodos getReader() e read()

Você pode ter acesso a um ReadableStreamBYOBReader definindo o parâmetro mode adequadamente: ReadableStream.getReader({ mode: "byob" }) Isso permite um controle mais preciso sobre o buffer alocação para evitar cópias. Para ler o stream de bytes, você precisa chamar ReadableStreamBYOBReader.read(view), em que view é um ArrayBufferView.

Exemplo de código de stream de bytes legível

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

A função a seguir retorna fluxos de bytes legíveis que permitem a leitura eficiente de cópia zero de um matriz gerada aleatoriamente. Em vez de usar um tamanho de fragmento predeterminado de 1.024, ele tenta preencher o buffer fornecido pelo desenvolvedor, possibilitando o controle total.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

A mecânica de um stream gravável

Um stream gravável é um destino em que é possível gravar dados, representados em JavaScript por uma Objeto WritableStream. Isso serve como uma abstração sobre um coletor subjacente, um coletor de E/S de nível inferior no qual dados brutos são gravados.

Os dados são gravados no stream por meio de um gravador, um bloco por vez. Um bloco pode pegar uma infinidade de formas, como os pedaços de um leitor. Use o código que quiser para produzir os pedaços prontos para serem gravados, entre o gravador e o código associado, é chamado de produtor.

Quando um escritor é criado e começa a escrever em um stream (um escritor ativo), diz-se que ele é bloqueado a ela. Apenas um gravador pode fazer gravações em um stream gravável por vez. Se você quiser outro gravador para começar a escrever em seu stream, você normalmente precisa liberá-lo antes de anexar outro escritor.

Uma fila interna rastreia os blocos que foram gravados no stream, mas ainda não processado pelo coletor subjacente.

Uma estratégia de enfileiramento é um objeto que determina como um stream deve sinalizar a contrapressão com base em o estado da fila interna dele. A estratégia de enfileiramento atribui um tamanho a cada bloco e compara o tamanho total de todos os fragmentos na fila para um número especificado, conhecido como marca alta.

A construção final é chamada de controlador. Cada stream gravável tem um controlador associado que permite que você controle a transmissão (por exemplo, para cancelá-la).

Como criar um stream gravável

A interface WritableStream dos a API Streams fornece uma abstração padrão para gravar dados de streaming em um destino, conhecido como um coletor. Esse objeto vem com contrapressão e enfileiramento integrados. Para criar um stream gravável, chamando o construtor WritableStream() Ele tem um parâmetro underlyingSink opcional, que representa um objeto com métodos e propriedades que definem como a instância de stream construída se comportará.

O underlyingSink

O underlyingSink pode incluir os seguintes métodos opcionais definidos pelo desenvolvedor. O controller passado a alguns dos métodos é uma WritableStreamDefaultController.

  • start(controller): este método é chamado imediatamente quando o objeto é construído. A o conteúdo desse método precisa ter acesso ao coletor subjacente. Se esse processo for se for realizado de modo assíncrono, ele pode retornar uma promessa para sinalizar o sucesso ou a falha.
  • write(chunk, controller): este método será chamado quando um novo bloco de dados (especificado no chunk) está pronta para ser gravada no coletor subjacente. Ele pode retornar uma promessa sinalizam o sucesso ou a falha da operação de gravação. Esse método será chamado somente após a tentativa gravações foram bem-sucedidas e nunca após o fluxo ser fechado ou cancelado.
  • close(controller): esse método será chamado se o app sinalizar que terminou de gravar blocos ao fluxo. O conteúdo deve fazer o que for necessário para finalizar as gravações no e liberar o acesso a ele. Se esse processo for assíncrono, ele pode retornar uma promessa de sinalizar o sucesso ou falha. Este método será chamado somente após todas as gravações na fila foram bem-sucedidos.
  • abort(reason): esse método será chamado se o app sinalizar que quer fechar abruptamente. no stream e colocá-lo em um estado de erro. Ele pode limpar quaisquer recursos mantidos, como close(), mas abort() será chamado mesmo que as gravações estejam na fila. Esses pedaços serão gerados embora. Se esse processo for assíncrono, ele poderá retornar uma promessa para sinalizar o sucesso ou a falha. A O parâmetro reason contém um DOMString que descreve por que o stream foi cancelado.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

A WritableStreamDefaultController interface da API Streams representa um controlador que permite o controle do estado de um WritableStream durante a configuração, já que mais blocos são enviados para gravação, ou no final do texto. Durante a construção um WritableStream, o coletor recebe um WritableStreamDefaultController correspondente para manipular. O WritableStreamDefaultController tem apenas um método: WritableStreamDefaultController.error(), o que causa erros em todas as interações futuras com o stream associado. WritableStreamDefaultController também oferece suporte a uma propriedade signal que retorna uma instância do AbortSignal, permitindo que uma operação WritableStream seja interrompida, se necessário.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

O queuingStrategy

O segundo argumento, também opcional, do construtor WritableStream() é queuingStrategy. É um objeto que, opcionalmente, define uma estratégia de enfileiramento para o stream, que usa dois parâmetros:

  • highWaterMark: um número não negativo que indica a marca d'água alta do fluxo usando essa estratégia de enfileiramento.
  • size(chunk): uma função que calcula e retorna o tamanho finito não negativo do valor do bloco especificado. O resultado é usado para determinar a pressão de retorno, que se manifesta usando a propriedade WritableStreamDefaultWriter.desiredSize adequada.
.

Os métodos getWriter() e write()

Para gravar em um stream gravável, você precisa de um gravador, que será WritableStreamDefaultWriter: O método getWriter() da interface WritableStream retorna uma nova instância de WritableStreamDefaultWriter e bloqueia o stream para essa instância. Enquanto o o stream está bloqueado, nenhum outro gravador pode ser adquirido até que o atual seja liberado.

O write() da classe WritableStreamDefaultWriter interface grava um bloco de dados transmitido em uma WritableStream e seu coletor subjacente e, em seguida, retorna uma promessa que é resolvida para indicar o sucesso ou a falha da operação de gravação. Observe que o que "sucesso" depende do coletor subjacente; isso pode indicar que o bloco foi aceito, e não necessariamente que estão salvos com segurança no destino final.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

A propriedade locked

Confira se um stream gravável está bloqueado acessando o WritableStream.locked .

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Exemplo de código de stream gravável

O exemplo de código abaixo mostra todas as etapas em ação.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Como direcionar um stream legível para um gravável

Um stream legível pode ser canalizado para um stream gravável por meio do método pipeTo(). O ReadableStream.pipeTo() canaliza o ReadableStream atual para um determinado WritableStream e retorna um promessa que será atendida quando o processo de piping for concluído com sucesso ou será rejeitada se houver algum erro. encontrados.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Como criar um fluxo de transformação

A interface TransformStream da API Streams representa um conjunto de dados transformáveis. Você crie um fluxo de transformação chamando o construtor TransformStream(), que cria e retorna um objeto de fluxo de transformação dos gerenciadores fornecidos. O construtor TransformStream() aceita como o primeiro argumento é um objeto JavaScript opcional que representa o transformer. Esses objetos podem contêm qualquer um dos seguintes métodos:

O transformer

  • start(controller): este método é chamado imediatamente quando o objeto é construído. Normalmente ele é usado para enfileirar blocos de prefixos usando controller.enqueue(). Esses blocos serão lidos no lado legível, mas não dependam de gravações no lado gravável. Se essa inicial processo é assíncrono, por exemplo, porque é necessário algum esforço para adquirir os blocos de prefixo, a função pode retornar uma promessa para sinalizar o sucesso ou a falha; uma promessa rejeitada causará um erro no riacho. Todas as exceções geradas serão geradas novamente pelo construtor TransformStream().
  • transform(chunk, controller): esse método é chamado quando um novo bloco gravado originalmente no o lado gravável está pronto para ser transformado. A implementação do stream garante que essa função será chamado somente depois que as transformações anteriores forem bem-sucedidas e nunca antes de start() concluído ou depois que flush() for chamado. Essa função executa a transformação real trabalho do fluxo de transformação. Ele pode enfileirar os resultados usando controller.enqueue(). Isso permite que um único bloco gravado no lado gravável resulte em zero ou vários blocos no lado legível, dependendo de quantas vezes controller.enqueue() é chamado. Se o processo de transformação é assíncrona, essa função pode retornar uma promessa para sinalizar o sucesso ou falha a transformação. Uma promessa rejeitada causará um erro nos lados legível e gravável da fluxo de transformação. Se nenhum método transform() for fornecido, será usada a transformação de identidade, o que enfileira blocos inalterados do lado gravável para o lado legível.
  • flush(controller): esse método é chamado depois que todos os blocos gravados no lado gravável foram transformado passando por transform(), e o lado gravável está prestes a ser fechadas. Normalmente, isso é usado para enfileirar blocos de sufixo para o lado legível, antes disso também é fechado. Se o processo de limpeza for assíncrono, a função pode retornar uma promessa para sinalizam sucesso ou falha; o resultado será comunicado ao autor da chamada stream.writable.write(): Além disso, uma promessa rejeitada causará um erro tanto as versões lados graváveis do stream. A geração de uma exceção é tratada da mesma forma que retornar uma mensagem promessa.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

As estratégias de enfileiramento writableStrategy e readableStrategy

O segundo e o terceiro parâmetros opcionais do construtor TransformStream() são opcionais Estratégias de enfileiramento writableStrategy e readableStrategy. Elas são definidas conforme descrito nas legível e gravável respectivamente.

Exemplo de código de fluxo de transformação

O exemplo de código a seguir mostra um fluxo de transformação simples em ação.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Como colocar um stream legível em um stream de transformação

O pipeThrough() da interface ReadableStream fornece uma maneira encadeada de encadear o fluxo atual usando um stream de transformação ou qualquer outro par gravável/legível. O pipe de um stream geralmente bloqueia pela duração do pipe, evitando que outros leitores o bloqueiem.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

O próximo exemplo de código (um pouco inventivo) mostra como implementar um "grito" versão de fetch() que coloca todo o texto em maiúsculas consumindo a promessa de resposta retornada como um stream e letras maiúsculas bloco por bloco. A vantagem dessa abordagem é que você não precisa esperar o documento inteiro para download, o que pode fazer uma enorme diferença ao lidar com arquivos grandes.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demonstração

A demonstração abaixo mostra streams legíveis, graváveis e de transformação em ação. Ele também inclui exemplos de cadeias de barra pipeThrough() e pipeTo(), além de demonstrar o tee(). Como opção, é possível executar a demonstração em sua própria janela ou visualizar código-fonte.

Streams úteis disponíveis no navegador

Há vários streams úteis integrados ao navegador. É fácil criar uma ReadableStream de um blob. O Blob o método stream() da interface retorna um ReadableStream que, após a leitura, retorna os dados contidos no blob. Lembre-se também de que O objeto File é um tipo específico de Blob e pode ser usada em qualquer contexto em que um blob pode.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

As variantes de streaming de TextDecoder.decode() e TextEncoder.encode() são chamadas TextDecoderStream e TextEncoderStream, respectivamente.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

A compactação ou descompactação de um arquivo é fácil com o CompressionStream e Fluxos de transformação DecompressionStream respectivamente. O exemplo de código abaixo mostra como você pode fazer o download da especificação Streams, compactá-la (gzip) no navegador e gravar o arquivo compactado diretamente no disco.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Os recursos da API File System Access FileSystemWritableFileStream e os fetch() streams de solicitação experimentais são exemplos de streams graváveis.

A API Serial faz uso intenso de streams legíveis e graváveis.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Por fim, a API WebSocketStream integra streams à API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Recursos úteis

Agradecimentos

Este artigo foi revisado por Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley e Adam Rice. As postagens do blog de Jake Archibald me ajudaram muito a entender córregos. Alguns dos exemplos de código são inspirados nos exemplos de código análises detalhadas de @bellbind e partes da prosa se baseiam fortemente Documentos da Web MDN em streams. A Streams Standard Os autores fizeram um trabalho incrível para escrever essa especificação. Imagem principal de Ryan Lara no Abrir a página.