Aprenda a usar fluxos legíveis, graváveis e de transformação com a API Streams.
A API Streams permite acessar programaticamente fluxos de dados recebidos pela rede ou criados localmente por qualquer meio e processá-los com JavaScript. O streaming envolve dividir um recurso que você quer receber, enviar ou transformar
em pequenos blocos e processar esses blocos pouco a pouco. Embora o streaming seja algo que os navegadores fazem de qualquer maneira ao receber recursos como HTML ou vídeos para serem mostrados em páginas da Web, esse recurso nunca esteve disponível para JavaScript antes de fetch
com streams ser introduzido em 2015.
Antes, se você quisesse processar um recurso de algum tipo (um vídeo, um arquivo de texto etc.), era necessário baixar o arquivo inteiro, esperar que ele fosse desserializado em um formato adequado e depois processá-lo. Com a disponibilidade de streams para JavaScript, tudo muda. Agora é possível processar dados brutos com JavaScript progressivamente assim que eles estiverem disponíveis no cliente, sem precisar gerar um buffer, uma string ou um blob. Isso desbloqueia vários casos de uso, alguns dos quais listei abaixo:
- Efeitos de vídeo:transmissão de um stream de vídeo legível por um stream de transformação que aplica efeitos em tempo real.
- (Des)compactação de dados:transmissão de um fluxo de arquivos por um fluxo de transformação que o (des)compacta seletivamente.
- Decodificação de imagens:transmissão de um fluxo de resposta HTTP por um fluxo de transformação que decodifica bytes em dados de bitmap e, em seguida, por outro fluxo de transformação que traduz bitmaps em PNGs. Se
instalado no gerenciador
fetch
de um service worker, isso permite fazer polyfill de forma transparente novos formatos de imagem, como AVIF.
Suporte ao navegador
ReadableStream e WritableStream
TransformStream
Principais conceitos
Antes de entrar em detalhes sobre os vários tipos de streams, vamos apresentar alguns conceitos básicos.
Pedaços
Um bloco é um único dado gravado ou lido de um fluxo. Ele pode ser de qualquer
tipo. Os fluxos podem até conter partes de tipos diferentes. Na maioria das vezes, um trecho não é a unidade de dados mais atômica para um determinado stream. Por exemplo, um fluxo de bytes pode conter blocos de 16
KiB unidades Uint8Array
, em vez de bytes únicos.
Streams legíveis
Um stream legível representa uma fonte de dados que pode ser lida. Em outras palavras, os dados saem de um fluxo legível. Concretamente, um stream legível é uma instância da classe ReadableStream
.
Streams graváveis
Um stream gravável representa um destino para dados em que você pode gravar. Em outras palavras, os dados entram em um fluxo gravável. Um fluxo gravável é uma instância da classe WritableStream
.
Transformar streams
Um stream de transformação consiste em um par de streams: um stream gravável, conhecido como lado gravável, e um stream legível, conhecido como lado legível.
Uma metáfora do mundo real para isso seria um intérprete simultâneo que traduz de um idioma para outro na hora.
De maneira específica para o stream de transformação, a gravação
no lado gravável resulta em novos dados disponíveis para leitura no lado
legível. Qualquer objeto com uma propriedade writable
e uma propriedade readable
pode servir como um fluxo de transformação. No entanto, a classe TransformStream
padrão facilita a criação de um par corretamente entrelaçado.
Cadeias de tubulação
Os fluxos são usados principalmente encadeando uns aos outros. Um fluxo legível pode ser transmitido diretamente
para um fluxo gravável, usando o método pipeTo()
do fluxo legível, ou pode ser transmitido por um
ou mais fluxos de transformação primeiro, usando o método pipeThrough()
do fluxo legível. Um conjunto de
fluxos conectados dessa forma é chamado de cadeia de pipes.
Limitação de capacidade
Depois que uma cadeia de pipes é construída, ela propaga sinais sobre a velocidade com que os blocos devem fluir por ela. Se alguma etapa da cadeia ainda não puder aceitar partes, ela vai propagar um sinal para trás pela cadeia de pipes até que a fonte original seja instruída a parar de produzir partes tão rapidamente. Esse processo de fluxo de normalização é chamado de contrapressão.
Teeing
Um fluxo legível pode ser ramificado (nomeado de acordo com o formato de um "T" maiúsculo) usando o método tee()
.
Isso vai bloquear o fluxo, ou seja, ele não poderá mais ser usado diretamente. No entanto, isso vai criar dois novos fluxos, chamados de ramificações, que podem ser consumidos de forma independente.
A ramificação também é importante porque os fluxos não podem ser rebobinados ou reiniciados. Falaremos mais sobre isso depois.
A mecânica de um fluxo legível
Um fluxo legível é uma fonte de dados representada em JavaScript por um objeto
ReadableStream
que
flui de uma fonte subjacente. O construtor
ReadableStream()
cria e retorna um objeto de fluxo legível dos manipuladores fornecidos. Há dois tipos de fonte subjacente:
- As fontes de push enviam dados constantemente quando você acessa o conteúdo. É sua responsabilidade iniciar, pausar ou cancelar o acesso ao stream. Exemplos incluem transmissões de vídeo ao vivo, eventos enviados pelo servidor ou WebSockets.
- As origens de extração exigem que você solicite dados explicitamente depois de se conectar a elas. Exemplos incluem operações HTTP por chamadas
fetch()
ouXMLHttpRequest
.
Os dados de stream são lidos sequencialmente em pequenas partes chamadas pedaços. Os blocos colocados em um stream são enfileirados. Isso significa que eles estão aguardando em uma fila para serem lidos. Uma fila interna acompanha os blocos que ainda não foram lidos.
Uma estratégia de enfileiramento é um objeto que determina como um fluxo deve sinalizar contrapressão com base no estado da fila interna. A estratégia de enfileiramento atribui um tamanho a cada bloco e compara o tamanho total de todos os blocos na fila a um número especificado, conhecido como marca de água alta.
Os blocos dentro do fluxo são lidos por um leitor. Esse leitor recupera os dados um bloco por vez, permitindo que você faça qualquer tipo de operação com eles. O leitor e o outro código de processamento que o acompanha são chamados de consumidor.
A próxima estrutura nesse contexto é chamada de controlador. Cada fluxo legível tem um controlador associado que, como o nome sugere, permite controlar o fluxo.
Apenas um leitor pode ler um stream por vez. Quando um leitor é criado e começa a ler um stream (ou seja, se torna um leitor ativo), ele é bloqueado para ele. Se você quiser que outro leitor assuma a leitura do seu stream, geralmente é necessário liberar o primeiro leitor antes de fazer qualquer outra coisa (embora seja possível dividir streams).
Como criar um stream legível
Você cria um fluxo legível chamando o construtor
ReadableStream()
.
O construtor tem um argumento opcional underlyingSource
, que representa um objeto
com métodos e propriedades que definem como a instância de fluxo construída vai se comportar.
O underlyingSource
Isso pode usar os seguintes métodos opcionais definidos pelo desenvolvedor:
start(controller)
: chamado imediatamente quando o objeto é construído. O método pode acessar a origem do fluxo e fazer qualquer outra coisa necessária para configurar a funcionalidade de streaming. Se esse processo for feito de forma assíncrona, o método poderá retornar uma promessa para sinalizar sucesso ou falha. O parâmetrocontroller
transmitido para esse método é umReadableStreamDefaultController
.pull(controller)
: pode ser usado para controlar o stream à medida que mais partes são buscadas. Ele é chamado repetidamente enquanto a fila interna de partes do fluxo não está cheia, até que a fila atinja a marca máxima. Se o resultado da chamadapull()
for uma promessa,pull()
não será chamado novamente até que essa promessa seja cumprida. Se a promessa for rejeitada, o stream vai gerar um erro.cancel(reason)
: chamado quando o consumidor de stream cancela o stream.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
O ReadableStreamDefaultController
é compatível com os seguintes métodos:
ReadableStreamDefaultController.close()
fecha o stream associado.ReadableStreamDefaultController.enqueue()
coloca uma determinada parte na fila do stream associado.ReadableStreamDefaultController.error()
causa erros em interações futuras com o stream associado.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
O queuingStrategy
O segundo argumento, também opcional, do construtor ReadableStream()
é queuingStrategy
.
É um objeto que define opcionalmente uma estratégia de enfileiramento para o stream, que usa dois parâmetros:
highWaterMark
: um número não negativo que indica a marca d'água alta do stream usando essa estratégia de enfileiramento.size(chunk)
: uma função que calcula e retorna o tamanho finito não negativo do valor de bloco especificado. O resultado é usado para determinar a contrapressão, que se manifesta pela propriedadeReadableStreamDefaultController.desiredSize
adequada. Ele também controla quando o métodopull()
da origem subjacente é chamado.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Os métodos getReader()
e read()
Para ler de um stream legível, você precisa de um leitor, que será um
ReadableStreamDefaultReader
.
O método getReader()
da interface ReadableStream
cria um leitor e bloqueia o stream para ele. Enquanto o fluxo está bloqueado, nenhum outro leitor pode ser adquirido até que este seja liberado.
O método read()
da interface ReadableStreamDefaultReader
retorna uma promessa que dá acesso ao próximo
pedaço na fila interna do fluxo. Ele atende ou rejeita com um resultado dependendo do estado do
fluxo. As diferentes possibilidades são as seguintes:
- Se um bloco estiver disponível, a promessa será cumprida com um objeto do formulário
{ value: chunk, done: false }
. - Se o fluxo for fechado, a promessa será cumprida com um objeto do formulário
{ value: undefined, done: true }
. - Se o stream apresentar um erro, a promessa será rejeitada com o erro relevante.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
A propriedade locked
Para verificar se um fluxo legível está bloqueado, acesse a propriedade
ReadableStream.locked
dele.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemplos de código de fluxo legível
O exemplo de código abaixo mostra todas as etapas em ação. Primeiro, crie um ReadableStream
que, no argumento underlyingSource
(ou seja, a classe TimestampSource
), defina um método start()
.
Esse método informa ao controller
da transmissão para
enqueue()
um carimbo de data/hora a cada segundo durante dez segundos.
Por fim, ele informa ao controlador para close()
o stream. Para consumir esse
fluxo, crie um leitor usando o método getReader()
e chame read()
até que o fluxo seja
done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iteração assíncrona
Verificar em cada iteração do loop read()
se o fluxo é done
pode não ser a API mais conveniente.
Felizmente, em breve haverá uma maneira melhor de fazer isso: a iteração assíncrona.
for await (const chunk of stream) {
console.log(chunk);
}
Uma solução alternativa para usar a iteração assíncrona hoje é implementar o comportamento com um polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Como criar um stream legível
O método tee()
da interface
ReadableStream
faz o tee da stream legível atual, retornando uma matriz de dois elementos
que contém as duas ramificações resultantes como novas instâncias de ReadableStream
. Isso permite que dois leitores leiam um fluxo simultaneamente. Por exemplo, é possível fazer isso em um service worker se
você quiser buscar uma resposta do servidor e transmiti-la para o navegador, mas também transmiti-la para o
cache do service worker. Como um corpo de resposta não pode ser consumido mais de uma vez, você precisa de duas cópias para fazer isso. Para cancelar o stream, você precisa cancelar as duas ramificações resultantes. Ao ramificar um fluxo, ele geralmente é bloqueado durante todo o processo, impedindo que outros leitores o bloqueiem.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Streams de bytes legíveis
Para fluxos que representam bytes, uma versão estendida do fluxo legível é fornecida para processar bytes de maneira eficiente, principalmente minimizando cópias. Os fluxos de bytes permitem adquirir leitores BYOB (bring-your-own-buffer). A implementação padrão pode gerar uma variedade de saídas diferentes, como strings ou buffers de matriz no caso de WebSockets, enquanto fluxos de bytes garantem a saída de bytes. Além disso, os leitores BYOB têm benefícios de estabilidade. Isso acontece porque, se um buffer for desanexado, ele poderá garantir que não será gravado duas vezes no mesmo buffer, evitando condições de corrida. Os leitores BYOB podem reduzir o número de vezes que o navegador precisa executar a coleta de lixo, porque podem reutilizar buffers.
Como criar um stream de bytes legível
É possível criar um fluxo de bytes legível transmitindo um parâmetro type
adicional para o
construtor ReadableStream()
.
new ReadableStream({ type: 'bytes' });
O underlyingSource
A fonte de um fluxo de bytes legível recebe um ReadableByteStreamController
para manipulação. O método ReadableByteStreamController.enqueue()
usa um argumento chunk
cujo valor é um ArrayBufferView
. A propriedade ReadableByteStreamController.byobRequest
retorna a solicitação
de BYOB atual ou nulo se não houver nenhuma. Por fim, a propriedade ReadableByteStreamController.desiredSize
retorna o tamanho desejado para preencher a fila interna do fluxo controlado.
O queuingStrategy
O segundo argumento, também opcional, do construtor ReadableStream()
é queuingStrategy
.
É um objeto que define opcionalmente uma estratégia de enfileiramento para o stream, que usa um parâmetro:
highWaterMark
: um número não negativo de bytes que indica a marca de alta qualidade do stream usando essa estratégia de enfileiramento. Isso é usado para determinar a contrapressão, que se manifesta pela propriedadeReadableByteStreamController.desiredSize
adequada. Ele também controla quando o métodopull()
da origem subjacente é chamado.
Os métodos getReader()
e read()
Em seguida, acesse um ReadableStreamBYOBReader
definindo o parâmetro mode
de acordo com o seguinte:
ReadableStream.getReader({ mode: "byob" })
. Isso permite um controle mais preciso sobre a alocação de buffer para evitar cópias. Para ler do fluxo de bytes, chame
ReadableStreamBYOBReader.read(view)
, em que view
é um
ArrayBufferView
.
Exemplo de código de fluxo de bytes legível
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
A função a seguir retorna fluxos de bytes legíveis que permitem uma leitura eficiente sem cópia de uma matriz gerada aleatoriamente. Em vez de usar um tamanho de bloco predeterminado de 1.024, ele tenta preencher o buffer fornecido pelo desenvolvedor, permitindo controle total.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
A mecânica de um stream gravável
Um fluxo gravável é um destino em que você pode gravar dados, representado em JavaScript por um objeto WritableStream
. Isso serve como uma abstração sobre um receptor subjacente, um receptor de E/S de nível mais baixo em que os dados brutos são gravados.
Os dados são gravados no fluxo por um gravador, um bloco por vez. Um trecho pode ter várias formas, assim como os trechos em um leitor. Você pode usar qualquer código para produzir os blocos prontos para escrita. O gravador e o código associado são chamados de produtor.
Quando um gravador é criado e começa a gravar em um stream (um gravador ativo), ele fica bloqueado para ele. Apenas um gravador pode gravar em um fluxo gravável por vez. Se você quiser que outro gravador comece a gravar no seu stream, geralmente é necessário liberar o stream antes de anexar outro gravador a ele.
Uma fila interna acompanha os blocos que foram gravados no fluxo, mas ainda não foram processados pelo coletor subjacente.
Uma estratégia de enfileiramento é um objeto que determina como um fluxo deve sinalizar contrapressão com base no estado da fila interna. A estratégia de enfileiramento atribui um tamanho a cada bloco e compara o tamanho total de todos os blocos na fila a um número especificado, conhecido como marca de água alta.
A construção final é chamada de controlador. Cada fluxo gravável tem um controlador associado que permite controlar o fluxo (por exemplo, para interrompê-lo).
Como criar um stream gravável
A interface WritableStream
da
API Streams fornece uma abstração padrão para gravar dados de streaming em um destino, conhecido
como um coletor. Esse objeto vem com contrapressão e enfileiramento integrados. Você cria um stream gravável chamando o construtor dele WritableStream()
.
Ele tem um parâmetro underlyingSink
opcional, que representa um objeto
com métodos e propriedades que definem como a instância de stream construída vai se comportar.
O underlyingSink
O underlyingSink
pode incluir os seguintes métodos opcionais definidos pelo desenvolvedor. O parâmetro controller
transmitido a alguns dos métodos é um WritableStreamDefaultController
.
start(controller)
: esse método é chamado imediatamente quando o objeto é construído. O conteúdo desse método deve ter como objetivo acessar o gravador subjacente. Se esse processo for feito de forma assíncrona, ele poderá retornar uma promessa para sinalizar sucesso ou falha.write(chunk, controller)
: esse método será chamado quando um novo bloco de dados (especificado no parâmetrochunk
) estiver pronto para ser gravado no coletor subjacente. Ela pode retornar uma promessa para indicar o sucesso ou a falha da operação de gravação. Esse método só será chamado depois que as gravações anteriores forem concluídas e nunca depois que o stream for fechado ou cancelado.close(controller)
: esse método será chamado se o app sinalizar que terminou de gravar pedaços no fluxo. O conteúdo precisa fazer o que for necessário para finalizar as gravações no coletor subjacente e liberar o acesso a ele. Se esse processo for assíncrono, ele poderá retornar uma promessa para sinalizar sucesso ou falha. Esse método será chamado somente depois que todas as gravações enfileiradas forem concluídas.abort(reason)
: esse método será chamado se o app indicar que quer fechar abruptamente o fluxo e colocá-lo em um estado de erro. Ele pode limpar todos os recursos mantidos, assim comoclose()
, masabort()
será chamado mesmo que as gravações estejam na fila. Esses pedaços serão descartados. Se esse processo for assíncrono, ele poderá retornar uma promessa para sinalizar sucesso ou falha. O parâmetroreason
contém umDOMString
que descreve por que o fluxo foi interrompido.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
A interface
WritableStreamDefaultController
da API Streams representa um controlador que permite controlar o estado de um WritableStream
durante a configuração, à medida que mais partes são enviadas para gravação ou no final da gravação. Ao construir
um WritableStream
, o gravador subjacente recebe uma instância WritableStreamDefaultController
correspondente para manipulação. O WritableStreamDefaultController
tem apenas um método:
WritableStreamDefaultController.error()
,
que causa erros em interações futuras com o fluxo associado.
WritableStreamDefaultController
também oferece suporte a uma propriedade signal
que retorna uma instância de
AbortSignal
,
permitindo que uma operação WritableStream
seja interrompida, se necessário.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
O queuingStrategy
O segundo argumento, também opcional, do construtor WritableStream()
é queuingStrategy
.
É um objeto que define opcionalmente uma estratégia de enfileiramento para o stream, que usa dois parâmetros:
highWaterMark
: um número não negativo que indica a marca d'água alta do stream usando essa estratégia de enfileiramento.size(chunk)
: uma função que calcula e retorna o tamanho finito não negativo do valor de bloco especificado. O resultado é usado para determinar a contrapressão, que se manifesta pela propriedadeWritableStreamDefaultWriter.desiredSize
adequada.
Os métodos getWriter()
e write()
Para gravar em um fluxo gravável, você precisa de um gravador, que será um
WritableStreamDefaultWriter
. O método getWriter()
da interface WritableStream
retorna uma nova instância de WritableStreamDefaultWriter
e bloqueia o fluxo para essa instância. Enquanto o
fluxo está bloqueado, nenhum outro gravador pode ser adquirido até que o atual seja liberado.
O método write()
da interface
WritableStreamDefaultWriter
grava um bloco de dados transmitido em um WritableStream
e no gravador subjacente. Em seguida, retorna
uma promessa que é resolvida para indicar o sucesso ou a falha da operação de gravação. O significado de "sucesso" depende do coletor de dados subjacente. Ele pode indicar que o bloco foi aceito, mas não necessariamente que foi salvo com segurança no destino final.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
A propriedade locked
Para verificar se um fluxo gravável está bloqueado, acesse a propriedade
WritableStream.locked
dele.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemplo de código de fluxo gravável
O exemplo de código abaixo mostra todas as etapas em ação.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Como redirecionar um stream legível para um gravável
Um fluxo legível pode ser transmitido para um fluxo gravável usando o método
pipeTo()
do fluxo legível.
ReadableStream.pipeTo()
transmite o ReadableStream
atual para um determinado WritableStream
e retorna uma
promessa que é cumprida quando o processo de transmissão é concluído com êxito ou rejeitada se ocorrerem erros.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Como criar um stream de transformação
A interface TransformStream
da API Streams representa um conjunto de dados transformáveis. Você
cria um fluxo de transformação chamando o construtor TransformStream()
, que cria e retorna
um objeto de fluxo de transformação dos manipuladores especificados. O construtor TransformStream()
aceita como primeiro argumento um objeto JavaScript opcional que representa o transformer
. Esses objetos podem
conter qualquer um dos seguintes métodos:
O transformer
start(controller)
: esse método é chamado imediatamente quando o objeto é construído. Normalmente, isso é usado para enfileirar partes de prefixo usandocontroller.enqueue()
. Esses blocos serão lidos do lado legível, mas não dependem de gravações no lado gravável. Se esse processo inicial for assíncrono, por exemplo, porque leva algum tempo para adquirir os blocos de prefixo, a função poderá retornar uma promessa para sinalizar sucesso ou falha. Uma promessa rejeitada vai gerar um erro no fluxo. Todas as exceções geradas serão geradas novamente pelo construtorTransformStream()
.transform(chunk, controller)
: esse método é chamado quando um novo bloco originalmente gravado no lado gravável está pronto para ser transformado. A implementação do stream garante que essa função será chamada somente depois que as transformações anteriores forem concluídas, e nunca antes destart()
ser concluído ou depois queflush()
for chamado. Essa função realiza o trabalho real de transformação do stream. É possível enfileirar os resultados usandocontroller.enqueue()
. Isso permite que um único bloco gravado no lado gravável resulte em zero ou vários blocos no lado legível, dependendo de quantas vezescontroller.enqueue()
é chamado. Se o processo de transformação for assíncrono, essa função poderá retornar uma promessa para sinalizar o sucesso ou a falha da transformação. Uma promessa rejeitada vai gerar um erro nos lados legíveis e graváveis do fluxo de transformação. Se nenhum métodotransform()
for fornecido, será usada a transformação de identidade, que coloca em fila blocos inalterados do lado gravável para o legível.flush(controller)
: esse método é chamado depois que todos os blocos gravados no lado gravável foram transformados ao passar portransform()
, e o lado gravável está prestes a ser fechado. Normalmente, isso é usado para enfileirar partes de sufixo no lado legível, antes que ele também seja fechado. Se o processo de limpeza for assíncrono, a função poderá retornar uma promessa para indicar sucesso ou falha. O resultado será comunicado ao autor da chamada destream.writable.write()
. Além disso, uma promessa rejeitada vai gerar um erro nos lados legíveis e graváveis do fluxo. Gerar uma exceção é tratado da mesma forma que retornar uma promessa rejeitada.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
As estratégias de enfileiramento writableStrategy
e readableStrategy
O segundo e o terceiro parâmetros opcionais do construtor TransformStream()
são as estratégias de enfileiramento opcionais writableStrategy
e readableStrategy
. Elas são definidas conforme descrito nas seções de fluxo legível e gravável, respectivamente.
Exemplo de código de transformação de stream
O exemplo de código a seguir mostra um fluxo de transformação simples em ação.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Como transmitir um stream legível por um stream de transformação
O método pipeThrough()
da interface ReadableStream
oferece uma maneira encadeável de transmitir o stream atual
por um stream de transformação ou qualquer outro par gravável/legível. O encadeamento de um fluxo geralmente o bloqueia durante a duração do encadeamento, impedindo que outros leitores o bloqueiem.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
O próximo exemplo de código (um pouco forçado) mostra como implementar uma versão "gritando" de fetch()
que coloca todo o texto em maiúsculas consumindo a promessa de resposta retornada
como um fluxo
e colocando em maiúsculas parte por parte. A vantagem dessa abordagem é que você não precisa esperar o download do documento inteiro, o que pode fazer uma grande diferença ao lidar com arquivos grandes.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demonstração
A demonstração abaixo mostra streams legíveis, graváveis e de transformação em ação. Ele também inclui exemplos
de cadeias de pipes pipeThrough()
e pipeTo()
, além de demonstrar tee()
. Você pode executar a demonstração em uma janela separada ou conferir o código-fonte.
Streams úteis disponíveis no navegador
Há vários fluxos úteis integrados ao navegador. É fácil criar um
ReadableStream
de um blob. O método stream() da interface Blob
retorna um ReadableStream
que, ao ser lido, retorna os dados contidos no blob. Lembre-se também de que um objeto File
é um tipo específico de Blob
e pode ser usado em qualquer contexto em que um blob pode.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
As variantes de streaming de TextDecoder.decode()
e TextEncoder.encode()
são chamadas de
TextDecoderStream
e
TextEncoderStream
, respectivamente.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Compactar ou descompactar um arquivo é fácil com os fluxos de transformação CompressionStream
e DecompressionStream
, respectivamente. O exemplo de código abaixo mostra como baixar a especificação Streams, compactá-la (gzip)
diretamente no navegador e gravar o arquivo compactado diretamente no disco.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Os
FileSystemWritableFileStream
da API File System Access
e os fluxos de solicitação fetch()
experimentais são
exemplos de fluxos graváveis em uso.
A API Serial usa muito fluxos legíveis e graváveis.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Por fim, a API WebSocketStream
integra fluxos com a API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Recursos úteis
- Especificação de streams
- Demonstrações complementares
- Polyfill de streams
- 2016: o ano dos fluxos da Web
- Iteradores e geradores assíncronos
- Visualizador de stream
Agradecimentos
Este artigo foi revisado por Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley e Adam Rice. As postagens do blog de Jake Archibald me ajudaram muito a entender streams. Alguns dos exemplos de código são inspirados nas explorações do usuário do GitHub @bellbind, e partes da prosa se baseiam muito nos MDN Web Docs sobre Streams (em inglês). Os autores do Streams Standard fizeram um trabalho incrível ao escrever esta especificação. Imagem principal de Ryan Lara no Unsplash.