Подробное руководство по Streams API
Использование читаемых, записываемых и преобразующих потоков с помощью Streams API.
Streams API позволяет программно получать доступ к потокам данных, принятым по сети или созданным локально каким-либо способом и обрабатывать их, используя JavaScript. Потоковая обработка включает в себя разбиение ресурса, который нужно получить, отправить или преобразовать, на небольшие фрагменты и их побитовую обработку. Эту задачу обычно решают браузеры при получении ресурсов (например, HTML или видео) для отображения на странице, однако в JavaScript такая возможность появилась только с реализацией fetch
с потоками в 2015 году.
Раньше, чтобы обработать какой-либо ресурс (видео, текстовый файл и т. д.), приходилось скачивать весь файл, дожидаться его десериализации в подходящий формат и только потом приступать к собственно обработке. Теперь работа с потоками реализована JavaScript, и всё стало намного проще. Первичные данные можно обрабатывать постепенно, как только они появляются на клиенте, и не нужно создавать буфер, строку или BLOB. Это открывает различные возможности, некоторые из которых я перечисляю ниже:
- Видеоэффекты: передача читаемого видеопотока через преобразующий поток, который применяет эффекты в реальном времени.
- Сжатие (распаковка) данных: передача файлового потока через преобразующий поток, который выборочно сжимает или распаковывает его.
- Декодирование изображений: передача потока HTTP-ответа последовательно через два преобразующих потока, первый из которых декодирует байты в данные битовой карты, а второй переводит ее в PNG-изображения. При установке внутри обработчика
fetch
сервис-воркера это позволяет прозрачно полифилить новые форматы изображений, например AVIF.
Основные понятия #
Прежде чем подробно разбирать различные типы потоков, познакомимся с основными понятиями.
Фрагменты #
Фрагмент — это один элемент данных, который записывается в поток или или читается из него. Он может быть любого типа; в потоках могут даже быть фрагменты разных типов. В большинстве случаев фрагмент не будет самой элементарной единицей данных для конкретного потока. Например, байтовый поток может содержать фрагменты, состоящие из элементов Uint8Array
по 16 КиБ, а не отдельных байтов.
Читаемые потоки #
Читаемый поток — это источник данных, из которого можно читать. Другими словами, из читаемого потока данные выходят. Говоря конкретнее, читаемый поток — это экземпляр класса ReadableStream
.
Записываемые потоки #
Записываемый поток — это пункт назначения для данных, в который можно писать. Другими словами, в записываемый поток данные входят. Записываемый поток — это экземпляр класса WritableStream
.
Преобразующие потоки #
Преобразующий поток состоит из пары потоков: записываемого потока («записываемая сторона») и читаемого потока («читаемая сторона»). В качестве метафоры можно представить себе синхронного переводчика, который переводит с одного языка на другой на лету. Запись на записываемую сторону приводит к тому, что с читаемой стороны становятся доступными новые данные — причем сам процесс зависит от преобразующего потока. Говоря конкретнее, преобразующим потоком может служить любой объект со свойствами writable
и readable
. Однако стандартный класс TransformStream
упрощает создание такой надлежащим образом связанной пары.
Цепочки перенаправления #
Потоки обычно используются посредством перенаправления друг в друга. Читаемый поток можно с помощью метода pipeTo()
направить непосредственно в записываемый поток или с помощью метода pipeThrough()
сначала пропустить через один или несколько преобразующих потоков. Набор соединенных таким образом потоков называется цепочкой перенаправления.
Обратная реакция #
Собранная цепочка перенаправления будет распространять сигналы о том, как быстро фрагменты должны через нее проходить. Если какой-либо элемент цепочки еще не может принимать фрагменты, она передает сигнал обратно, пока в конечном итоге исходный источник не получит указание прекратить такое быстрое производство фрагментов. Этот процесс нормализации потока называется обратной реакцией.
Раздвоение #
Читаемый поток можно «раздвоить» с помощью метода tee()
— в результате он заблокируется и его нельзя будет использовать напрямую, но зато появятся два новых потока — так называемые «ветви», которые можно использовать самостоятельно. Раздвоение — важный процесс, поскольку потоки нельзя перемотать или перезапустить (подробнее об этом позже).
Принцип действия читаемого потока #
Читаемый поток — это источник данных, представленный в JavaScript как объект ReadableStream
, поступающий из базового источника. Конструктор ReadableStream()
создает и возвращает объект читаемого потока из указанных обработчиков. Базовые источники бывают двух типов:
- Активные источники отправляют данные постоянно, когда вы к ним обращаетесь, и вы решаете, запускать, приостанавливать или отменять доступ к потоку. Это может быть потоковое видео, отправленные сервером события или подключения WebSocket.
- Пассивные источники требуют, чтобы после подключения у них явным образом запрашивали данные. Это могут быть операции HTTP-операции через вызовы
fetch()
илиXMLHttpRequest
.
Данные потока читаются последовательно небольшими частями — фрагментами. Помещенные в поток фрагменты считаются поставленными в очередь. Это означает, что они ждут прочтения в очереди. Непрочитанные фрагменты отслеживаются во внутренней очереди.
Стратегия организации очереди — это объект, определяющий, как поток должен давать обратную реакцию на основе состояния своей внутренней очереди. Стратегия организации очереди назначает каждому фрагменту размер и сравнивает общий размер всех фрагментов в очереди с заданным числом — максимальным уровнем.
Фрагменты внутри потока читаются считывателем. Он извлекает данные по одному фрагменту за раз, что позволяет выполнять с ними любые операции. Считыватель совместно с кодом обработки называется потребителем.
Следующий объект в этом контексте называется контроллер. У каждого читаемого потока есть связанный с ним контроллер, который, как следует из названия, позволяет контролировать этот поток.
Одновременно читать поток может только один считыватель; как только считыватель создан и начинает читать поток (становится активным), он прикрепляется к потоку. Чтобы чтением потока занялся другой считыватель, обычно необходимо освободить первый считыватель (хотя можно еще раздвоить поток).
Создание читаемого потока #
Чтобы создать читаемый поток, нужно вызвать его конструктор ReadableStream()
. У конструктора есть необязательный аргумент underlyingSource
— объект с методами и свойствами, определяющими поведение создаваемого экземпляра потока.
Объект underlyingSource
#
Здесь можно использовать необязательные методы, определяемые разработчиком:
- Метод
start(controller)
: вызывается сразу после создания объекта. Может обращаться к источнику потока и делать всё необходимое для настройки функциональности потока. Если этот процесс должен выполняться асинхронно, метод может вернуть промис для сигнализации о завершении операции или сбое. Передаваемый в метод параметрcontroller
— этоReadableStreamDefaultController
. - Метод
pull(controller)
: может использоваться для управления потоком по мере получения фрагментов. Вызывается повторно, пока внутренняя очередь фрагментов потока не заполнится — то есть, пока очередь не достигнет максимального уровня. Если результат вызоваpull()
— промис, тоpull()
не будет вызываться, пока промис не будет выполнен. Если промис отклонен, поток перейдет в состояние ошибки. - Метод
cancel(reason)
: вызывается, когда потребитель отменяет поток.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
Контроллер ReadableStreamDefaultController
поддерживает следующие методы:
ReadableStreamDefaultController.close()
закрывает соответствующий поток.ReadableStreamDefaultController.enqueue()
ставит данный фрагмент в очередь соответствующего потока.ReadableStreamDefaultController.error()
вызывает ошибку при любом последующем взаимодействии с соответствующим потоком.
/* … */
start(controller) {
controller.enqueue('Первый фрагмент!');
},
/* … */
Аргумент queuingStrategy
#
Второй (тоже необязательный) аргумент конструктора ReadableStream()
— queuingStrategy
. Это объект, дополнительно задающий стратегию организации очереди для потока. Принимает два параметра:
highWaterMark
— неотрицательное число, указывающее максимальный уровень потока, использующего эту стратегию организации очереди;size(chunk)
— функция, которая вычисляет и возвращает конечный неотрицательный размер заданного значения фрагмента. Результат используется для определения обратной реакции, отражаемой в соответствующем свойствеReadableStreamDefaultController.desiredSize
. Также определяет, когда вызывается методpull()
базового источника.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Методы getReader()
и read()
#
Чтобы читать из читаемого потока, нужен считыватель — ReadableStreamDefaultReader
. Метод getReader()
интерфейса ReadableStream
создает считыватель и блокирует для него поток. Во время блокировки получить еще один считыватель нельзя, пока текущий не будет освобожден.
Метод read()
интерфейса ReadableStreamDefaultReader
возвращает промис, предоставляющий доступ к следующему фрагменту во внутренней очереди потока. Он выполняет или отклоняет результат в зависимости от состояния потока. Возможны следующие варианты:
- Если фрагмент доступен, промис будет выполнен с объектом вида
{ value: chunk, done: false }
. - Если поток станет закрытым, промис будет выполнен с объектом вида
{ value: undefined, done: true }
. - Если поток перейдет в состояние ошибки, промис будет отклонен с соответствующей ошибкой.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('Поток завершен.');
break;
}
console.log('Прочитан фрагмент:', value);
}
Свойство locked
#
Что проверить, заблокирован ли читаемый поток, используйте его свойство ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`Поток ${locked ? 'indeed' : 'не'} заблокирован.`);
Примеры кода с читаемым потоком #
В примере кода ниже показаны все этапы в действии. Сначала вы создаете ReadableStream
, который в аргументе underlyingSource
(класс TimestampSource
) определяет метод start()
. Этот метод указывает контроллеру (controller
), что каждую секунду в течение десяти секунд нужно ставить в очередь (enqueue()
) метку времени. В конце он указывает контроллеру закрыть (close()
) поток. Чтобы потреблять поток, методом getReader()
создается считыватель, после чего вызывается read()
, пока поток не будет done
(завершен).
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Добавить строку в поток.
controller.enqueue(string);
console.log(`Поставлено в очередь: ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Закрыть поток через 10 с.
controller.close();
}, 10_000);
}
cancel() {
// Вызывается, когда считыватель выполняет отмену.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// Метод `read()` возвращает промис, который
// разрешается, когда получено значение.
const { done, value } = await reader.read();
// Объекты результата содержат два свойства:
// `done` — `true`, если поток уже выдал все свои данные.
// `value` — данные. Всегда `undefined`, если `done` — `true`.
if (done) return result;
result += value;
console.log(`Прочитано символов на данный момент: ${result.length}`);
console.log(`Последний прочитанный фрагмент: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Поток завершен', result));
Асинхронное итерирование #
Проверять на каждой итерации цикла read()
значение done
потока — пожалуй, не самый удобный API. К счастью, скоро появится более удобный способ делать это — асинхронное итерирование.
for await (const chunk of stream) {
console.log(chunk);
}
А пока что использовать асинхронное итерирование можно с помощью вспомогательной функции. Это позволяет использовать эту функцию в коде, как показано ниже.
function streamAsyncIterator(stream) {
// Получаем блокировку на потоке:
const reader = stream.getReader();
return {
next() {
// Операции чтения потока уже разрешаются с помощью {done, value}, поэтому
// можно просто вызвать `read()`:
return reader.read();
},
return() {
// Снять блокировку, если итератор прекращает работу.
reader.releaseLock();
return {};
},
// Выражение «for await» делает вызов на всём, что было передано, поэтому
// итераторы обычно возвращают себя.
[Symbol.asyncIterator]() {
return this;
},
};
}
async function example() {
const response = await fetch(url);
for await (const chunk of streamAsyncIterator(response.body)) {
console.log(chunk);
}
}
Раздвоение читаемого потока #
Метод tee()
интерфейса ReadableStream
раздваивает текущий читаемый поток и возвращает двухэлементный массив с двумя ветвями — новыми экземплярами ReadableStream
. Это позволяет двум считывателем читать один поток одновременно. Это может пригодиться, например, для сервис-воркера, когда нужно получить ответ от сервера, передать его браузеру и одновременно отправить потоком в кеш сервис-воркера. Тело ответа нельзя использовать более одного раза, поэтому здесь понадобятся две копии. Чтобы отменить поток, нужно отменить обе полученные ветви. При раздвоении потока он обычно блокируется, что не дает другим считывателям его заблокировать.
const readableStream = new ReadableStream({
start(controller) {
// Вызывается конструктором.
console.log('[запуск]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Вызываем `read()`, когда очередь контроллера пуста.
console.log('[извлечение]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Вызывается при отмене потока.
console.log('[отмена]', reason);
},
});
// Создаем два файла `ReadableStream`.
const [streamA, streamB] = readableStream.tee();
// Читаем streamA итеративно один за другим. Обычно
// так не делается, но это возможно.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Читаем streamB в цикле. Это более привычный способ
// чтения данных из потока.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Читаемые байтовые потоки #
Если поток представляет байты, используется расширенная версия читаемого потока, которая обеспечивает эффективную обработку байтов, в частности, путем минимизации числа копий. Байтовые потоки позволяют получать считыватели собственного буфера (BYOB). Реализация по умолчанию может давать различные данные на выходе, например буферы строк и массивов в случае WebSocket, а байтовые потоки всегда дают байтовый вывод. Кроме того, у BYOB-считывателей есть преимущество в стабильности. Если буфер отключается, запись в один и тот же буфер не будет проводиться дважды, что позволяет избежать состояния гонки. BYOB-считыватели могут сократить количество запусков сборки мусора браузером, поскольку они умеют использовать буферы повторно.
Создание читаемого байтового потока #
Чтобы создать читаемый байтовый поток, передайте дополнительный параметр type
в конструктор ReadableStream()
.
new ReadableStream({ type: 'bytes' });
Объект underlyingSource
#
Базовому источнику читаемого байтового потока для манипуляций дается контроллер ReadableByteStreamController
. Его метод ReadableByteStreamController.enqueue()
требует аргумент chunk
, значение которого представляет собой ArrayBufferView
. Свойство ReadableByteStreamController.byobRequest
возвращает текущий запрос BYOB на извлечение или null, если его нет. Наконец, свойство ReadableByteStreamController.desiredSize
возвращает желаемый размер для заполнения внутренней очереди контролируемого потока.
Аргумент queuingStrategy
#
Второй (тоже необязательный) аргумент конструктора ReadableStream()
— queuingStrategy
. Это объект, дополнительно задающий стратегию организации очереди для потока. Принимает один параметр:
highWaterMark
— неотрицательное количество байтов, указывающее максимальный уровень потока, использующего эту стратегию организации очереди. Используется для определения обратной реакции, отражаемой в соответствующем свойствеReadableByteStreamController.desiredSize
. Также определяет, когда вызывается методpull()
базового источника.
Методы getReader()
и read()
#
Получить доступ к ReadableStreamBYOBReader
можно, соответствующим образом задав параметр mode
: ReadableStream.getReader({ mode: "byob" })
. Это позволяет более точно контролировать выделение буфера во избежание копирования. Чтобы читать из байтового потока, нужно вызвать ReadableStreamBYOBReader.read(view)
, где view
— ArrayBufferView
.
Пример кода с читаемым байтовым потоком #
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("Первые 1024 байта или меньше:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Следующая функция возвращает читаемые байтовые потоки, обеспечивающие эффективное чтение случайно сгенерированного массива с нулевым копированием. Она не использует предопределенный размер фрагмента (1024), а пытается заполнить предоставленный разработчиком буфер и обеспечивает полный контроль.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Даже если потребитель использует считыватель по умолчанию,
// функция автоматического выделения выделяет буфер и
// передает его нам через `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Принцип действия записываемого потока #
Записываемый поток — это пункт назначения для данных, в который можно писать. В JavaScript он представлен объектом WritableStream
. Он служит абстракцией поверх базового приемника — низкоуровневого приемника ввода-вывода, в который записываются первичные данные.
Данные пишутся в поток записывателем, по одному фрагменту за раз. Фрагмент может принимать различные формы, как и фрагмент в считывателе. Для создания готовых к записи фрагментов можно использовать любой код. Записыватель в сочетании с соответствующим кодом называется производителем.
Когда записыватель создан и начинает писать в поток (активный записыватель), говорят, что он прикрепляется к нему. В записываемый поток одномоментно может писать только один записыватель. Чтобы другой записыватель мог начать писать в поток, обычно необходимо освободить его и только затем прикреплять новый записыватель.
Внутренняя очередь отслеживает записанные в поток фрагменты, которые еще не были обработаны базовым приемником.
Стратегия организации очереди — это объект, определяющий, как поток должен давать обратную реакцию на основе состояния своей внутренней очереди. Стратегия организации очереди назначает каждому фрагменту размер и сравнивает общий размер всех фрагментов в очереди с заданным числом — максимальным уровнем.
Последний нужный нам объект здесь — контроллер. У каждого записываемого потока есть связанный с ним контроллер, который позволяет контролировать этот поток (например, прерывать).
Создание записываемого потока #
Интерфейс WritableStream
Streams API предоставляет стандартную абстракцию для записи потоковых данных в пункт назначения — приемник. У этого объекта есть встроенная обратная реакция и организация очереди. Чтобы создать записываемый поток, нужно вызвать его конструктор WritableStream()
. У него есть необязательный параметр underlyingSink
— объект с методами и свойствами, определяющими поведение создаваемого экземпляра потока.
Объект underlyingSink
#
В underlyingSink
могут быть следующие необязательные методы, определяемые разработчиком. Передаваемый в некоторые методы параметр controller
— это WritableStreamDefaultController
.
- Метод
start(controller)
: вызывается сразу после создания объекта. Его код должен обеспечивать доступ к базовому приемнику. Если этот процесс должен выполняться асинхронно, метод может вернуть промис для сигнализации о завершении операции или сбое. - Метод
write(chunk, controller)
: вызывается при появлении нового фрагмента данных (указанного в параметреchunk
), готового к записи в базовый приемник. Может вернуть промис для сигнализации о завершении операции или сбое. Вызывается только после успешного завершения предыдущих операций записи и никогда — после закрытия или прерывания потока. - Метод
close(controller)
: вызывается, если приложение сигнализирует о завершении записи фрагментов в поток. Код метода должен сделать всё необходимое для завершения операций записи в базовый приемник и освобождения доступа к нему. Если это асинхронный процесс, метод может вернуть промис для сигнализации о завершении операции или сбое. Вызывается только после успешного завершения всех операций записи в очереди. - Метод
abort(reason)
: вызывается, если приложение сигнализирует, что нужно внезапно закрыть поток и перевести его в состояние ошибки. Код метода должен очистить удерживаемые ресурсы, примерно как иclose()
, ноabort()
вызывается, даже если очередь операций записи не пуста. Соответствующие фрагменты будут отброшены. Если это асинхронный процесс, метод может вернуть промис для сигнализации о завершении операции или сбое. Параметрreason
содержитDOMString
с описанием причины, по которой поток был прерван.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Интерфейс WritableStreamDefaultController
в Streams API — это контроллер, позволяющий управлять состоянием WritableStream
во время настройки, когда на запись отправляется больше фрагментов, и в конце записи. При создании WritableStream
базовому приемнику дается соответствующий экземпляр WritableStreamDefaultController
для манипуляций. WritableStreamDefaultController
содержит только один метод: WritableStreamDefaultController.error()
, который вызывает ошибку при любом последующем взаимодействии с соответствующим потоком.
/* … */
write(chunk, controller) {
try {
// Пробуем сделать что-нибудь опасное с `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
Аргумент queuingStrategy
#
Второй (тоже необязательный) аргумент конструктора WritableStream()
— queuingStrategy
. Это объект, дополнительно задающий стратегию организации очереди для потока. Принимает два параметра:
highWaterMark
— неотрицательное число, указывающее максимальный уровень потока, использующего эту стратегию организации очереди;size(chunk)
— функция, которая вычисляет и возвращает конечный неотрицательный размер заданного значения фрагмента. Результат используется для определения обратной реакции, отражаемой в соответствующем свойствеWritableStreamDefaultWriter.desiredSize
.
Методы getWriter()
и write()
#
Чтобы писать в записываемый поток, нужен записыватель — WritableStreamDefaultWriter
. Метод getWriter()
интерфейса WritableStream
возвращает новый экземпляр WritableStreamDefaultWriter
и блокирует поток для этого экземпляра. Во время блокировки получить еще один записыватель нельзя, пока текущий не будет освобожден.
Метод write()
интерфейса WritableStreamDefaultWriter
записывает переданный фрагмент данных в поток WritableStream
и его базовый приемник, а затем возвращает промис, разрешение которого указывает на завершение или сбой операции записи. Здесь смысл «завершения» зависит от базового приемника и может указывать на то, что фрагмент был принят, но не обязательно надежно сохранен в конечном пункте назначения.
const writer = writableStream.getWriter();
const resultPromise = writer.write('Первый фрагмент!');
Свойство locked
#
Чтобы проверить, заблокирован ли записываемый поток, используйте его свойство WritableStream.locked
.
const locked = writableStream.locked;
console.log(`Поток ${locked ? 'indeed' : 'не'} заблокирован.`);
Пример кода с записываемым потоком #
В примере кода ниже показаны все этапы в действии.
const writableStream = new WritableStream({
start(controller) {
console.log('[запуск]');
},
async write(chunk, controller) {
console.log('[запись]', chunk);
// Ожидание следующей записи.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[закрытие]');
},
abort(reason) {
console.log('[прерван]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Ждем, чтобы добавить в очередь записи.
await writer.ready;
console.log('[готов]', Date.now() - start, 'ms');
// Промис разрешается после завершения записи.
writer.write(char);
}
await writer.close();
Передача читаемого потока в записываемый поток #
Читаемый поток можно с помощью метода pipeTo()
направить в записываемый поток. ReadableStream.pipeTo()
направляет текущий ReadableStream
в указанный WritableStream
и возвращает промис, который выполняется, когда процесс перенаправления завершается, и отклоняется в случае ошибки.
const readableStream = new ReadableStream({
start(controller) {
// Вызывается конструктором.
console.log('[запуск читаемого]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Вызывается, когда очередь контроллера пуста.
console.log('[извлечение]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Вызывается при отмене потока.
console.log('[отмена]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Вызывается конструктором
console.log('[запуск записываемого]');
},
async write(chunk, controller) {
// Вызывается после writer.write()
console.log('[запись]', chunk);
// Ожидание следующей записи.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[закрытие]');
},
abort(reason) {
console.log('[прерван]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[завершено]');
Создание преобразующего потока #
Интерфейс TransformStream
в Streams API представляет набор преобразующихся данных. Преобразующий поток создается вызовом конструктора TransformStream()
, который создает и возвращает объект преобразующего потока из заданных обработчиков. Первый аргумент конструктора TransformStream()
— необязательный объект JavaScript, представляющий transformer
(преобразователь). Такие объекты могут содержать любой из следующих методов:
Объект transformer
#
- Метод
start(controller)
: вызывается сразу после создания объекта. Обычно используется для постановки в очередь префиксных фрагментов — с помощьюcontroller.enqueue()
. Эти фрагменты считываются с читаемой стороны, но не зависят от операций записи на записываемой стороне. Если этот исходный процесс выполняется асинхронно (например, если для получения префиксных фрагментов требуются некоторые действия), функция может вернуть промис для сигнализации о завершении операции или сбое; отклоненный промис переведет поток в состояние ошибки. Все вызванные исключения будут повторно вызваны конструкторомTransformStream()
. - Метод
transform(chunk, controller)
: вызывается, когда новый фрагмент, изначально записанный на записываемую сторону, готов к преобразованию. Реализация потока гарантирует, что эта функция будет вызываться только после успешного завершения предыдущих преобразований, но не раньше, чемstart()
завершит работу, и не позже, чем будет вызванflush()
. Эта функция выполняет фактическую работу по преобразованию в преобразующем потоке. Она может поставить результаты в очередь с помощьюcontroller.enqueue()
. Благодаря этому один фрагмент, записанный на записываемой стороне, может дать ноль или несколько фрагментов на читаемой стороне, в зависимости от количества вызововcontroller.enqueue()
. В случае асинхронного процесса преобразования эта функция может возвращать промис для сигнализации о завершении или сбое операции. Отклоненный промис переведет в состояние ошибки и читаемую, и записываемую стороны преобразующего потока. Если методtransform()
не указан, используется преобразование идентичности: в очередь с записываемой стороны на читаемую ставятся неизменные фрагменты. - Метод
flush(controller)
: вызывается после того, как все фрагменты, записанные на записываемую сторону, были преобразованы посредствомtransform()
и записываемая сторона должна быть закрыта. Обычно используется для постановки суффиксных фрагментов в очередь на читаемую сторону, прежде чем она тоже закроется. Если процесс освобождения (flush) — асинхронный, функция может вернуть промис для сигнализации о завершении или сбое операции; результат будет передан туда, откуда был вызванstream.writable.write()
. Отклоненный промис переведет в состояние ошибки и читаемую, и записываемую стороны потока. Вызов исключения обрабатывается так же, как возврат отклоненного промиса.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Стратегии организации очереди writableStrategy
и readableStrategy
#
Второй и третий необязательные параметры конструктора TransformStream()
— стратегии организации очереди writableStrategy
и readableStrategy
. Их определение описано в разделах о читаемых и записываемых потоках соответственно.
Пример кода с преобразующим потоком #
В примере кода ниже показан несложный преобразующий поток в действии.
// Обратите внимание, что сейчас `TextEncoderStream` и `TextDecoderStream` уже есть.
// В примере показано, как это делалось бы раньше.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[преобразование]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[освобождение]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[значение]', result.value);
}
})();
Передача читаемого потока в преобразующий поток #
Метод pipeThrough()
интерфейса ReadableStream
обеспечивает цепное перенаправление текущего потока через преобразующий поток или любую другую пару «записываемый — читаемый». При перенаправлении потока он обычно блокируется, что не дает другим считывателям его заблокировать.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[преобразование]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[освобождение]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// вызывается конструктором
console.log('[запуск]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// вызываем `read`, когда очередь контроллера пуста
console.log('[извлечение]');
controller.enqueue('d');
controller.close(); // или controller.error();
},
cancel(reason) {
// вызывается при `rs.cancel(reason)`
console.log('[отмена]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[значение]', result.value);
}
})();
В следующем примере (немного надуманном) показано, как реализовать «кричащую» версию fetch()
, который переводит текст в верхний регистр, получая возвращенный промис ответа как поток и меняя регистр фрагментом за фрагментом. Преимущество — не нужно ждать загрузки всего документа, что может быть очень важно при работе с большими файлами.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Поддержка в браузерах и полифил #
Поддержка Streams API зависит от браузера. Подробнее о совместимости см. на странице Can I use. В некоторых браузерах определенные функции реализованы частично, поэтому обязательно сверяйтесь с этими данными.
Но есть и хорошая новость: доступна эталонная реализация и полифил, нацеленные на использование в реальных проектах.
Демонстрация #
В примере ниже — читаемые, записываемые и преобразующие потоки в действии, а также примеры конвейеров pipeThrough()
и pipeTo()
и раздвоения посредством tee()
. Демонстрацию можно запустить в отдельном окне или открыть ее исходный код.
Полезные потоки, доступные в браузере #
В браузер встроено несколько удобных на практике потоков. Из BLOB можно легко создать ReadableStream
. Метод stream() интерфейса Blob
возвращает ReadableStream
, который после чтения возвращает данные из BLOB. Вспомним также, что объект File
— это особый вид Blob
и может использоваться в том же контексте, что и BLOB.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Потоковые варианты TextDecoder.decode()
и TextEncoder.encode()
называются TextDecoderStream
и TextEncoderStream
соответственно.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Сжимать и распаковывать файлы удобно преобразующими потоками CompressionStream
и DecompressionStream
соответственно. В примере ниже показано, как загрузить спецификацию Streams, сжать ее (gzip) ее сразу в браузере и записать сжатый файл напрямую на диск.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Поток FileSystemWritableFileStream
в File System Access API и экспериментальные потоки запроса fetch()
— примеры реального применения записываемых потоков.
В Serial API широко используются и читаемые, и записываемые потоки.
// Предложить пользователю выбрать последовательный порт.
const port = await navigator.serial.requestPort();
// Дождаться открытия последовательного порта.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Слушать данные, поступающие от устройства с последовательным интерфейсом.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Разрешить закрытие последовательного порта позже.
reader.releaseLock();
break;
}
// значение `value` — Uint8Array.
console.log(value);
}
// Записать в последовательный порт.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Разрешить закрытие последовательного порта позже.
writer.releaseLock();
Наконец, WebSocketStream
API встраивает потоки в WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Полезные ресурсы #
- Спецификация Streams API.
- Демонстрации.
- Полифил для потоков.
- 2016 г. — год веб-потоков.
- Асинхронные итераторы и генераторы.
- Визуализатор потоков.
Благодарности #
Статью просматривали и проверяли: Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley и Adam Rice. Публикация Джейка Арчибальда в блоге помогла мне разобраться в потоках. Идеи для некоторых примеров частично взяты из кода пользователя @bellbind на GitHub. Текст частично основан на веб-документации MDN по потокам. Авторы стандарта Streams проделали огромную работу по написанию этой спецификации. Главное изображение — Ryan Lara (Unsplash).