Узнайте, как использовать доступные для чтения, записи и преобразования потоки с помощью API Streams.
API Streams позволяет программно получать доступ к потокам данных, полученным по сети или созданным локально любыми средствами, и обрабатывать их с помощью JavaScript. Потоковая передача данных подразумевает разбиение ресурса, который требуется получить, отправить или преобразовать, на небольшие фрагменты с последующей их пошаговой обработкой. Хотя браузеры и так используют потоковую передачу данных при получении таких ресурсов, как HTML или видео, для отображения на веб-страницах, эта возможность была доступна только в JavaScript до появления fetch
с помощью потоков в 2015 году.
Раньше, если нужно было обработать какой-либо ресурс (будь то видео, текстовый файл и т. д.), приходилось загружать весь файл, ждать, пока он будет десериализован в подходящий формат, а затем обрабатывать его. С появлением потоков в JavaScript всё изменилось. Теперь вы можете обрабатывать необработанные данные с помощью JavaScript постепенно, как только они становятся доступны на клиенте, без необходимости генерировать буфер, строку или двоичный объект. Это открывает ряд возможностей использования, некоторые из которых перечислены ниже:
- Видеоэффекты: передача читаемого видеопотока через поток преобразования, который применяет эффекты в реальном времени.
- (Де)сжатие данных: передача потока файлов через поток преобразования, который выборочно (де)сжимает его.
- Декодирование изображений: передача потока HTTP-ответа через поток преобразования, который декодирует байты в растровые данные, а затем через другой поток преобразования, который переводит растровые изображения в PNG. Установка внутри обработчика
fetch
сервис-воркера позволяет прозрачно применять полифиллы к новым форматам изображений, таким как AVIF.
Поддержка браузеров
ReadableStream и WritableStream
TransformStream
Основные концепции
Прежде чем подробно рассказать о различных типах потоков, позвольте мне представить некоторые основные концепции.
Куски
Фрагмент (чанк) — это отдельный фрагмент данных , записываемый в поток или считываемый из него. Он может быть любого типа; потоки могут даже содержать фрагменты разных типов. В большинстве случаев фрагмент не будет самой атомарной единицей данных для данного потока. Например, поток байтов может содержать фрагменты, состоящие из 16 КБ Uint8Array
, а не из отдельных байтов.
Читаемые потоки
Поток, доступный для чтения, представляет собой источник данных, из которого можно читать данные. Другими словами, данные поступают из потока, доступного для чтения. Конкретно, поток, доступный для чтения, — это экземпляр класса ReadableStream
.
Записываемые потоки
Поток с возможностью записи представляет собой место назначения для данных, в которое можно записывать данные. Другими словами, данные поступают в поток с возможностью записи. Конкретно, поток с возможностью записи — это экземпляр класса WritableStream
.
Трансформировать потоки
Поток преобразования состоит из пары потоков : потока с возможностью записи, называемого его записываемой стороной, и потока с возможностью чтения, называемого его читаемой стороной. В реальном мире это можно сравнить с синхронным переводчиком , который осуществляет динамический перевод с одного языка на другой. В отличие от потока преобразования, запись в записываемую сторону приводит к тому, что новые данные становятся доступными для чтения с читаемой стороны. В частности, любой объект с writable
и readable
свойствами может служить потоком преобразования. Однако стандартный класс TransformStream
упрощает создание такой пары, которая будет должным образом связана.
Трубчатые цепи
Потоки данных в основном используются для их соединения друг с другом. Поток данных для чтения может быть напрямую перенаправлен в поток данных для записи с помощью метода pipeTo()
потока для чтения или сначала пропущен через один или несколько потоков преобразования с помощью метода pipeThrough()
потока для чтения. Набор потоков, объединенных таким образом, называется цепочкой каналов.
Противодавление
После построения цепочки каналов она будет передавать сигналы о том, с какой скоростью должны проходить порции. Если какой-либо звено цепочки пока не может принимать порции, сигнал передается в обратном направлении по цепочке каналов, пока исходный источник не получит команду прекратить столь быструю передачу порций. Этот процесс нормализации потока называется противодавлением.
Тиинг
Поток, доступный для чтения, можно заблокировать (teed) (название образовано заглавной буквой «T») с помощью метода tee()
. Это заблокирует поток, то есть сделает его недоступным для непосредственного использования; однако при этом будут созданы два новых потока , называемых ветвями (branch), которые можно использовать независимо. Заблокировать потоки также важно, поскольку их нельзя перемотать или перезапустить, подробнее об этом позже.
Механика читаемого потока
Поток данных, доступный для чтения, — это источник данных в JavaScript, представленный объектом ReadableStream
, который поступает из базового источника. Конструктор ReadableStream()
создаёт и возвращает объект потока данных, доступный для чтения, из указанных обработчиков. Существует два типа базовых источников:
- Источники push-уведомлений постоянно отправляют вам данные при обращении к ним, и вы можете самостоятельно начать, приостановить или отменить доступ к потоку. Примерами могут служить прямые видеопотоки, события, отправленные сервером, или веб-сокеты.
- Источники данных pull требуют явного запроса данных после подключения. Примерами служат HTTP-операции через вызовы
fetch()
илиXMLHttpRequest
.
Потоковые данные считываются последовательно небольшими фрагментами, называемыми чанками . Фрагменты, помещенные в поток, называются поставленными в очередь . Это означает, что они ожидают чтения в очереди. Внутренняя очередь отслеживает ещё не прочитанные фрагменты.
Стратегия организации очереди — это объект, который определяет, как поток должен сигнализировать о противодавлении, основываясь на состоянии своей внутренней очереди. Стратегия организации очереди назначает размер каждому фрагменту и сравнивает общий размер всех фрагментов в очереди с заданным числом, известным как «верхняя граница» (high water mark) .
Фрагменты данных внутри потока считываются читателем . Этот читатель извлекает данные по одному фрагменту за раз, позволяя выполнять с ними любые необходимые операции. Читатель вместе с сопутствующим ему кодом обработки называется потребителем .
Следующая конструкция в этом контексте называется контроллером . С каждым доступным для чтения потоком связан контроллер, который, как следует из названия, позволяет управлять потоком.
Только один читатель может читать поток одновременно; когда читатель создаётся и начинает читать поток (то есть становится активным читателем ), он закрепляется за ним. Если вы хотите, чтобы другой читатель взял на себя чтение вашего потока, обычно необходимо освободить первого читателя, прежде чем выполнять какие-либо действия (хотя вы можете создавать потоки).
Создание читаемого потока
Читаемый поток создаётся путём вызова его конструктора ReadableStream()
. Конструктор имеет необязательный аргумент underlyingSource
, представляющий объект с методами и свойствами, определяющими поведение сконструированного экземпляра потока.
underlyingSource
При этом могут использоваться следующие необязательные методы, определяемые разработчиком:
-
start(controller)
: вызывается немедленно после создания объекта. Метод может получить доступ к источнику потока и выполнить любые другие действия, необходимые для настройки функциональности потока. Если этот процесс должен выполняться асинхронно, метод может вернуть обещание, сигнализирующее об успехе или неудаче. Параметрcontroller
, передаваемый этому методу, —ReadableStreamDefaultController
. -
pull(controller)
: может использоваться для управления потоком по мере извлечения новых фрагментов. Вызывается многократно до тех пор, пока внутренняя очередь фрагментов потока не заполнится, пока очередь не достигнет максимальной отметки. Если результатом вызоваpull()
является обещание,pull()
не будет вызван повторно, пока это обещание не будет выполнено. Если обещание будет отклонено, поток перейдет в режим ошибки. -
cancel(reason)
: вызывается, когда потребитель потока отменяет поток.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
поддерживает следующие методы:
-
ReadableStreamDefaultController.close()
закрывает связанный поток. -
ReadableStreamDefaultController.enqueue()
помещает заданный фрагмент в очередь связанного потока. -
ReadableStreamDefaultController.error()
приводит к ошибке любого будущего взаимодействия с соответствующим потоком.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Второй, также необязательный, аргумент конструктора ReadableStream()
— queuingStrategy
. Это объект, который опционально определяет стратегию организации очереди для потока. Он принимает два параметра:
-
highWaterMark
: неотрицательное число, указывающее наивысшую точку потока, использующего данную стратегию организации очереди. -
size(chunk)
: функция, которая вычисляет и возвращает конечный неотрицательный размер заданного значения фрагмента. Результат используется для определения противодавления, которое отображается через соответствующее свойствоReadableStreamDefaultController.desiredSize
. Он также определяет момент вызова методаpull()
базового источника.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Методы getReader()
и read()
Для чтения из потока, доступного для чтения, необходим читатель, которым будет ReadableStreamDefaultReader
. Метод getReader()
интерфейса ReadableStream
создаёт читатель и блокирует поток. Пока поток заблокирован, ни один другой читатель не может быть получен, пока этот не будет освобождён.
Метод read()
интерфейса ReadableStreamDefaultReader
возвращает обещание, предоставляющее доступ к следующему фрагменту во внутренней очереди потока. Результат выполнения или отклонения зависит от состояния потока. Возможны следующие варианты:
- Если фрагмент доступен, обещание будет выполнено с объектом формы
{ value: chunk, done: false }
. - Если поток закроется, обещание будет выполнено с объектом формы
{ value: undefined, done: true }
. - Если поток окажется ошибочным, обещание будет отклонено с соответствующей ошибкой.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
locked
собственность
Проверить, заблокирован ли читаемый поток, можно, обратившись к его свойству ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Примеры читаемого потокового кода
В примере кода ниже показаны все этапы в действии. Сначала вы создаёте ReadableStream
, в аргументе underlyingSource
которого (то есть в классе TimestampSource
) определяется метод start()
. Этот метод указывает controller
потока на необходимость помещать временную метку enqueue()
каждую секунду в течение десяти секунд. Затем он указывает контроллеру на close()
потока. Вы используете этот поток, создавая считыватель с помощью метода getReader()
и вызывая read()
до done
потока.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Асинхронная итерация
Проверка done
потока на каждой итерации цикла read()
может быть не самым удобным API. К счастью, скоро появится лучший способ сделать это: асинхронная итерация.
for await (const chunk of stream) {
console.log(chunk);
}
Обходным решением для использования асинхронной итерации сегодня является реализация поведения с помощью полифилла.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Создание читаемого потока
Метод tee()
интерфейса ReadableStream
создаёт tee для текущего читаемого потока, возвращая двухэлементный массив, содержащий две результирующие ветви в виде новых экземпляров ReadableStream
. Это позволяет двум читателям одновременно читать поток. Это можно сделать, например, в сервис-воркере, если нужно получить ответ с сервера и передать его браузеру, а также направить его в кэш сервис-воркера. Поскольку тело ответа не может быть использовано более одного раза, для этого требуются две копии. Чтобы отменить поток, необходимо отменить обе результирующие ветви. Создание tee для потока, как правило, блокирует его на время, не позволяя другим читателям заблокировать его.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Читаемые потоки байтов
Для потоков, представляющих байты, предоставляется расширенная версия читаемого потока для эффективной обработки байтов, в частности, за счёт минимизации количества копий. Байтовые потоки позволяют привлекать считыватели с использованием собственного буфера (BYOB). Реализация по умолчанию может предоставлять различные выходные данные, такие как строки или буферы массивов в случае WebSockets, тогда как байтовые потоки гарантируют байтовый вывод. Кроме того, считыватели BYOB обладают преимуществами в плане стабильности. Это связано с тем, что при отсоединении буфера гарантируется, что запись в один и тот же буфер не будет производиться дважды, что позволяет избежать состояния гонки. Считыватели BYOB могут сократить количество запусков сборки мусора браузером, поскольку могут повторно использовать буферы.
Создание читаемого потока байтов
Вы можете создать читаемый поток байтов, передав дополнительный параметр type
конструктору ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
Базовый источник читаемого потока байтов управляется контроллером ReadableByteStreamController
. Его метод ReadableByteStreamController.enqueue()
принимает аргумент chunk
значением которого является ArrayBufferView
. Свойство ReadableByteStreamController.byobRequest
возвращает текущий запрос на извлечение BYOB или null, если его нет. Наконец, свойство ReadableByteStreamController.desiredSize
возвращает желаемый размер для заполнения внутренней очереди управляемого потока.
queuingStrategy
Второй, также необязательный, аргумент конструктора ReadableStream()
— queuingStrategy
. Это объект, который опционально определяет стратегию организации очереди для потока и принимает один параметр:
-
highWaterMark
: неотрицательное число байтов, указывающее максимальную отметку потока, использующего данную стратегию организации очереди. Это значение используется для определения обратного давления, которое отображается через соответствующее свойствоReadableByteStreamController.desiredSize
. Оно также определяет момент вызова методаpull()
базового источника.
Методы getReader()
и read()
Затем вы можете получить доступ к ReadableStreamBYOBReader
, задав соответствующий параметр mode
: ReadableStream.getReader({ mode: "byob" })
. Это позволяет более точно контролировать выделение буфера, избегая копирования. Для чтения из потока байтов необходимо вызвать ReadableStreamBYOBReader.read(view)
, где view
— это ArrayBufferView
.
Пример кода читаемого потока байтов
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Следующая функция возвращает читаемые потоки байтов, позволяющие эффективно читать случайно сгенерированный массив без копирования. Вместо использования предопределённого размера фрагмента 1024 она пытается заполнить буфер, предоставленный разработчиком, что обеспечивает полный контроль.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Механика записываемого потока
Поток с возможностью записи — это место назначения, куда можно записывать данные. В JavaScript он представлен объектом WritableStream
. Он служит абстракцией над базовым приёмником — низкоуровневым приёмником ввода-вывода, в который записываются необработанные данные.
Данные записываются в поток через модуль записи (writer) , по одному фрагменту за раз. Фрагмент может принимать множество форм, как и фрагменты в модуле чтения (reader). Вы можете использовать любой код для подготовки фрагментов к записи; модуль записи (writer) и связанный с ним код называются модулем создания (producer ).
Когда создаётся объект записи и начинает запись в поток ( активный объект записи ), он считается заблокированным . В один и тот же поток, доступный для записи, может одновременно записывать только один объект записи. Если вы хотите, чтобы другой объект записи начал запись в ваш поток, обычно необходимо освободить его, прежде чем подключать к нему другой объект записи.
Внутренняя очередь отслеживает фрагменты, записанные в поток, но еще не обработанные базовым приемником.
Стратегия организации очереди — это объект, который определяет, как поток должен сигнализировать о противодавлении, основываясь на состоянии своей внутренней очереди. Стратегия организации очереди назначает размер каждому фрагменту и сравнивает общий размер всех фрагментов в очереди с заданным числом, известным как «верхняя граница» (high water mark) .
Последняя конструкция называется контроллером . С каждым записываемым потоком связан контроллер, позволяющий управлять потоком (например, прерывать его).
Создание записываемого потока
Интерфейс WritableStream
API Streams предоставляет стандартную абстракцию для записи потоковых данных в приемник, известный как приемник. Этот объект обладает встроенными функциями обратного давления и организации очередей. Поток, доступный для записи, создаётся путём вызова его конструктора WritableStream()
. Он имеет необязательный параметр underlyingSink
, представляющий собой объект с методами и свойствами, определяющими поведение созданного экземпляра потока.
underlyingSink
underlyingSink
может включать следующие необязательные методы, определяемые разработчиком. Параметр controller
, передаваемый некоторым методам, — это WritableStreamDefaultController
.
-
start(controller)
: этот метод вызывается сразу после создания объекта. Содержимое этого метода должно быть направлено на получение доступа к базовому приемнику. Если этот процесс выполняется асинхронно, он может вернуть обещание, сигнализирующее об успехе или неудаче. -
write(chunk, controller)
: этот метод будет вызван, когда новый фрагмент данных (указанный в параметреchunk
) будет готов к записи в базовый приемник. Он может вернуть обещание, сигнализирующее об успешности или неудаче операции записи. Этот метод будет вызван только после успешного выполнения предыдущих операций записи и никогда после закрытия или прерывания потока. -
close(controller)
: этот метод будет вызван, если приложение сообщит о завершении записи фрагментов в поток. Содержимое должно выполнить все необходимые действия для завершения записи в базовый приемник и освободить к нему доступ. Если этот процесс асинхронный, он может вернуть обещание, сигнализирующее об успехе или неудаче. Этот метод будет вызван только после успешного завершения всех операций записи в очереди. -
abort(reason)
: этот метод будет вызван, если приложение сигнализирует о желании внезапно закрыть поток и перевести его в состояние ошибки. Он может очистить любые захваченные ресурсы, подобно методуclose()
, ноabort()
будет вызван даже при наличии очереди записей. Эти фрагменты будут удалены. Если этот процесс асинхронный, он может вернуть обещание, сигнализирующее об успехе или неудаче. Параметрreason
содержитDOMString
, описывающий причину прерывания потока.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Интерфейс WritableStreamDefaultController
API потоков представляет собой контроллер, позволяющий управлять состоянием потока WritableStream
во время настройки, по мере отправки новых фрагментов данных на запись или по завершении записи. При создании потока WritableStream
базовому приемнику данных предоставляется соответствующий экземпляр WritableStreamDefaultController
для управления. У WritableStreamDefaultController
есть только один метод: WritableStreamDefaultController.error()
, который приводит к ошибке любого последующего взаимодействия с соответствующим потоком. WritableStreamDefaultController
также поддерживает свойство signal
, возвращающее экземпляр AbortSignal
, что позволяет при необходимости остановить операцию WritableStream
.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Второй, также необязательный, аргумент конструктора WritableStream()
— queuingStrategy
. Это объект, который опционально определяет стратегию организации очереди для потока. Он принимает два параметра:
-
highWaterMark
: неотрицательное число, указывающее наивысшую точку потока, использующего данную стратегию организации очереди. -
size(chunk)
: функция, которая вычисляет и возвращает конечный неотрицательный размер заданного значения фрагмента. Результат используется для определения обратного давления, которое отображается через соответствующее свойствоWritableStreamDefaultWriter.desiredSize
.
Методы getWriter()
и write()
Для записи в поток, доступный для записи, необходим объект записи, которым будет WritableStreamDefaultWriter
. Метод getWriter()
интерфейса WritableStream
возвращает новый экземпляр WritableStreamDefaultWriter
и блокирует поток с этим экземпляром. Пока поток заблокирован, ни один другой объект записи не может быть запрошен, пока текущий не будет освобождён.
Метод write()
интерфейса WritableStreamDefaultWriter
записывает переданный фрагмент данных в WritableStream
и его базовый приемник, а затем возвращает обещание, которое разрешается, чтобы указать на успешность или неуспешность операции записи. Обратите внимание, что «успех» определяется базовым приемником; он может означать, что фрагмент был принят, но не обязательно, что он безопасно сохранён в конечном месте назначения.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
locked
собственность
Проверить, заблокирован ли доступный для записи поток, можно, обратившись к его свойству WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Пример кода записываемого потока
В примере кода ниже показаны все шаги в действии.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Передача читаемого потока в записываемый поток
Поток, доступный для чтения, можно перенаправить в поток, доступный для записи, с помощью метода pipeTo()
потока, доступного для чтения. Метод ReadableStream.pipeTo()
перенаправляет текущий поток ReadableStream
в заданный WritableStream
и возвращает обещание, которое выполняется при успешном завершении процесса перенаправки или отклоняется в случае возникновения ошибок.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Создание потока преобразования
Интерфейс TransformStream
API Streams представляет собой набор преобразуемых данных. Поток преобразования создаётся путём вызова его конструктора TransformStream()
, который создаёт и возвращает объект потока преобразования из заданных обработчиков. Конструктор TransformStream()
принимает в качестве первого аргумента необязательный объект JavaScript, представляющий transformer
. Такие объекты могут содержать любой из следующих методов:
transformer
-
start(controller)
: этот метод вызывается сразу после создания объекта. Обычно он используется для постановки префиксных фрагментов в очередь с помощьюcontroller.enqueue()
. Эти фрагменты будут считываться с доступной для чтения стороны, но не зависят от записи на доступную для записи сторону. Если этот начальный процесс асинхронный, например, из-за того, что получение префиксных фрагментов требует определённых усилий, функция может вернуть обещание, сигнализирующее об успехе или неудаче; отклонённое обещание приведёт к ошибке потока. Любые выданные исключения будут повторно выданы конструкторомTransformStream()
. -
transform(chunk, controller)
: этот метод вызывается, когда новый фрагмент, изначально записанный на записываемую сторону, готов к преобразованию. Реализация потока гарантирует, что эта функция будет вызвана только после успешного выполнения предыдущих преобразований, и никогда до завершенияstart()
или после вызоваflush()
. Эта функция выполняет фактическую работу по преобразованию потока преобразования. Она может ставить результаты в очередь с помощьюcontroller.enqueue()
. Это позволяет одному фрагменту, записанному на записываемую сторону, приводить к появлению нуля или нескольких фрагментов на читаемой стороне, в зависимости от того, сколько раз вызываетсяcontroller.enqueue()
. Если процесс преобразования асинхронный, эта функция может вернуть обещание, сигнализирующее об успешности или неудаче преобразования. Отклоненное обещание приведет к ошибке как читаемой, так и записываемой сторон потока преобразования. Если методtransform()
не предоставлен, используется тождественное преобразование, которое ставит неизмененные фрагменты с записываемой стороны на читаемую сторону. -
flush(controller)
: этот метод вызывается после того, как все фрагменты, записанные на записываемую сторону, были преобразованы путём успешного прохожденияtransform()
, и записываемая сторона готова к закрытию. Обычно это используется для постановки суффиксных фрагментов на читаемую сторону, прежде чем она тоже будет закрыта. Если процесс очистки асинхронный, функция может вернуть обещание, сигнализирующее об успехе или неудаче; результат будет передан вызывающему методstream.writable.write()
. Кроме того, отклонённое обещание приведёт к ошибке как на читаемой, так и на записываемой стороне потока. Вызов исключения обрабатывается так же, как возврат отклонённого обещания.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Стратегии очередей writableStrategy
и readableStrategy
Второй и третий необязательные параметры конструктора TransformStream()
— это необязательные стратегии организации очереди writableStrategy
и readableStrategy
. Они определены, как описано в разделах, посвященных потоку с возможностью чтения и записи соответственно.
Пример кода преобразования потока
В следующем примере кода показан простой поток преобразования в действии.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Передача читаемого потока через поток преобразования
Метод pipeThrough()
интерфейса ReadableStream
предоставляет цепочечный способ передачи текущего потока через поток преобразования или любую другую пару, допускающую запись/чтение. Передача потока по конвейеру обычно блокирует его на время работы канала, предотвращая его блокировку другими читателями.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Следующий пример кода (немного надуманный) показывает, как можно реализовать «кричащую» версию функции fetch()
, которая преобразует весь текст в верхний регистр, используя возвращаемый ответный обещание как поток и преобразуя его в верхний регистр по частям. Преимущество такого подхода заключается в том, что вам не нужно ждать загрузки всего документа, что может иметь огромное значение при работе с большими файлами.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Демо
Демонстрация ниже демонстрирует работу потоков, доступных для чтения, записи и преобразования. Она также включает примеры цепочек каналов pipeThrough()
и pipeTo()
, а также демонстрирует tee()
. Вы можете запустить демонстрацию в отдельном окне или просмотреть исходный код .
Полезные трансляции доступны в браузере
В браузер встроено несколько полезных потоков. Вы можете легко создать ReadableStream
из BLOB-объекта. Метод stream() интерфейса Blob
возвращает ReadableStream
, который при чтении возвращает данные, содержащиеся в BLOB-объекте. Также напомним, что объект File
— это особый вид Blob
, который может использоваться в любом контексте, в котором может использоваться BLOB-объект.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Потоковые варианты TextDecoder.decode()
и TextEncoder.encode()
называются TextDecoderStream
и TextEncoderStream
соответственно.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Сжатие и распаковка файла легко выполняются с помощью потоков преобразования CompressionStream
и DecompressionStream
соответственно. В примере кода ниже показано, как загрузить спецификацию Streams, сжать её (gzip) прямо в браузере и записать сжатый файл непосредственно на диск.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
FileSystemWritableFileStream
API доступа к файловой системе и экспериментальные потоки запросов fetch()
являются примерами доступных для записи потоков в реальной жизни.
Последовательный API активно использует как читаемые, так и записываемые потоки.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Наконец, API WebSocketStream
интегрирует потоки с API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Полезные ресурсы
- Спецификация потоков
- Сопутствующие демонстрации
- Потоки полифилла
- 2016 год — год веб-трансляций
- Асинхронные итераторы и генераторы
- Визуализатор потока
Благодарности
Эту статью рецензировали Джейк Арчибальд , Франсуа Бофорт , Сэм Даттон , Маттиас Буэленс , Сурма , Джо Медли и Адам Райс . Публикации Джейка Арчибальда в блоге очень помогли мне в понимании потоков. Некоторые примеры кода вдохновлены исследованиями пользователя GitHub @bellbind , а часть текста во многом основана на документах MDN Web Docs по потокам . Авторы Streams Standard проделали колоссальную работу по написанию этой спецификации. Изображение главного героя предоставлено Райаном Ларой на Unsplash .