Потоки — полное руководство

Узнайте, как использовать потоки, доступные для чтения, записи и преобразования, с помощью Streams API.

API Streams позволяет программно получать доступ к потокам данных, полученных по сети или созданных любым способом локально, и обрабатывать их с помощью JavaScript. Потоковая передача предполагает разбиение ресурса, который вы хотите получить, отправить или преобразовать, на небольшие фрагменты и последующую постепенную обработку этих фрагментов. Несмотря на то, что потоковая передача — это то, что браузеры в любом случае делают при получении ресурсов, таких как HTML или видео, для отображения на веб-страницах, эта возможность никогда не была доступна для JavaScript до тех пор, пока в 2015 году не была представлена fetch с потоками.

Раньше, если вы хотели обработать какой-то ресурс (будь то видео, текстовый файл и т.п.), вам приходилось скачивать весь файл целиком, ждать его десериализации в подходящий формат, а затем обрабатывать это. Когда потоки стали доступны для JavaScript, все меняется. Теперь вы можете постепенно обрабатывать необработанные данные с помощью JavaScript, как только они станут доступны на клиенте, без необходимости создания буфера, строки или большого двоичного объекта. Это открывает ряд вариантов использования, некоторые из которых я перечисляю ниже:

  • Видеоэффекты: передача читаемого видеопотока через поток преобразования, который применяет эффекты в реальном времени.
  • (де)сжатие данных: передача потока файлов через поток преобразования, который выборочно (де)сжимает его.
  • Декодирование изображения: передача потока ответов HTTP через поток преобразования, который декодирует байты в данные растрового изображения, а затем через другой поток преобразования, который преобразует растровые изображения в PNG. Если он установлен внутри обработчика fetch сервисного работника, он позволяет прозрачно заполнять новые форматы изображений, такие как AVIF.

Поддержка браузера

ReadableStream и WritableStream.

Поддержка браузера

  • Хром: 43.
  • Край: 14.
  • Фаерфокс: 65.
  • Сафари: 10.1.

Источник

ТрансформироватьСтрим

Поддержка браузера

  • Хром: 67.
  • Край: 79.
  • Фаерфокс: 102.
  • Сафари: 14.1.

Источник

Основные понятия

Прежде чем я подробно расскажу о различных типах потоков, позвольте мне представить некоторые основные концепции.

куски

Чанк — это отдельный фрагмент данных , который записывается в поток или считывается из него. Он может быть любого типа; потоки могут даже содержать фрагменты разных типов. В большинстве случаев фрагмент не будет самой атомарной единицей данных для данного потока. Например, поток байтов может содержать фрагменты, состоящие из блоков Uint8Array по 16 КиБ, а не из отдельных байтов.

Читаемые потоки

Читаемый поток представляет собой источник данных, из которых вы можете читать. Другими словами, данные поступают из читаемого потока. Конкретно, читаемый поток — это экземпляр класса ReadableStream .

Записываемые потоки

Доступный для записи поток представляет собой место назначения данных, в которые вы можете записывать. Другими словами, данные поступают в записываемый поток. Конкретно, записываемый поток является экземпляром класса WritableStream .

Преобразование потоков

Поток преобразования состоит из пары потоков : записываемого потока, известного как его записываемая сторона, и читаемого потока, известного как его читаемая сторона. Реальной метафорой этого может быть синхронный переводчик , который переводит с одного языка на другой «на лету». Способом, специфичным для потока преобразования, запись на записываемую сторону приводит к тому, что новые данные становятся доступными для чтения с читаемой стороны. Конкретно, любой объект со свойством, writable и свойством, readable , может служить потоком преобразования. Однако стандартный класс TransformStream упрощает создание такой пары, которая правильно запутана.

Трубчатые цепи

Потоки в основном используются для передачи их друг другу. Читаемый поток можно передать непосредственно в записываемый поток, используя метод pipeTo() читаемого потока, или сначала его можно передать через один или несколько потоков преобразования, используя метод pipeThrough() читаемого потока. Набор потоков, соединенных вместе таким образом, называется цепочкой каналов.

Противодавление

Как только цепочка каналов построена, она будет распространять сигналы о том, как быстро через нее должны проходить куски. Если какой-либо шаг в цепочке еще не может принять фрагменты, он распространяет сигнал обратно по цепочке каналов, пока в конечном итоге исходному источнику не будет приказано прекратить производить фрагменты так быстро. Этот процесс нормализации потока называется противодавлением.

тройка

Читаемый поток может быть открыт (назван в честь буквы «Т») с помощью метода tee() . Это заблокирует поток, то есть сделает его недоступным для прямого использования; однако он создаст два новых потока , называемых ветвями, которые можно будет использовать независимо. Титинг также важен, поскольку потоки нельзя перемотать или перезапустить, подробнее об этом позже.

Схема цепочки каналов, состоящей из читаемого потока, поступающего в результате вызова API-интерфейса выборки, который затем передается через поток преобразования, выходные данные которого передаются, а затем отправляется в браузер для первого результирующего читаемого потока и в кеш сервисного работника для второй результирующий читаемый поток.
Цепь труб.

Механика читаемого потока

Читаемый поток — это источник данных, представленный в JavaScript объектом ReadableStream , который поступает из базового источника. Конструктор ReadableStream() создает и возвращает читаемый объект потока из заданных обработчиков. Существует два типа основного источника:

  • Источники push-уведомлений постоянно передают вам данные, когда вы получили к ним доступ, и вы можете запустить, приостановить или отменить доступ к потоку. Примеры включают прямые видеопотоки, события, отправляемые сервером, или WebSockets.
  • Источники извлечения требуют, чтобы вы явно запрашивали у них данные после подключения. Примеры включают операции HTTP через вызовы fetch() или XMLHttpRequest .

Потоковые данные считываются последовательно небольшими частями, называемыми частями . Говорят, что фрагменты, помещенные в поток, ставятся в очередь . Это означает, что они ждут в очереди, готовые к чтению. Внутренняя очередь отслеживает фрагменты, которые еще не были прочитаны.

Стратегия организации очереди — это объект, который определяет, как поток должен сигнализировать о противодавлении на основе состояния его внутренней очереди. Стратегия организации очередей присваивает размер каждому куску и сравнивает общий размер всех кусков в очереди с указанным числом, известным как верхняя граница .

Фрагменты внутри потока читаются программой чтения . Этот читатель извлекает данные по частям за раз, позволяя вам выполнять с ними любые операции, которые вы хотите. Читатель плюс другой код обработки, который сопровождает его, называется потребителем .

Следующая конструкция в этом контексте называется контроллером . С каждым читаемым потоком связан контроллер, который, как следует из названия, позволяет вам управлять потоком.

Только один читатель может читать поток одновременно; когда читатель создается и начинает читать поток (то есть становится активным читателем ), он привязывается к нему. Если вы хотите, чтобы другой читатель взял на себя чтение вашего потока, вам обычно нужно освободить первого читателя, прежде чем делать что-либо еще (хотя вы можете развернуть потоки).

Создание читаемого потока

Вы создаете читаемый поток, вызывая его конструктор ReadableStream() . Конструктор имеет необязательный аргумент, underlyingSource , который представляет объект с методами и свойствами, которые определяют, как будет вести себя созданный экземпляр потока.

underlyingSource

При этом могут использоваться следующие дополнительные методы, определенные разработчиком:

  • start(controller) : вызывается сразу после создания объекта. Метод может получить доступ к источнику потока и выполнить все остальные действия, необходимые для настройки функциональности потока. Если этот процесс должен выполняться асинхронно, метод может вернуть обещание, сигнализирующее об успехе или неудаче. Параметр controller , передаваемый этому методу, — это ReadableStreamDefaultController .
  • pull(controller) : может использоваться для управления потоком по мере извлечения большего количества фрагментов. Он вызывается повторно до тех пор, пока внутренняя очередь фрагментов потока не заполнена, вплоть до тех пор, пока очередь не достигнет своей верхней отметки. Если результатом вызова pull() является обещание, pull() не будет вызываться снова, пока указанное обещание не будет выполнено. Если обещание отклоняется, поток станет ошибочным.
  • cancel(reason) : вызывается, когда потребитель потока отменяет поток.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController поддерживает следующие методы:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Второй, также необязательный, аргумент конструктора ReadableStream()queuingStrategy . Это объект, который опционально определяет стратегию организации очереди для потока, который принимает два параметра:

  • highWaterMark : неотрицательное число, указывающее верхнюю границу потока, использующего эту стратегию организации очереди.
  • size(chunk) : функция, которая вычисляет и возвращает конечный неотрицательный размер данного значения фрагмента. Результат используется для определения противодавления, проявляющегося через соответствующее свойство ReadableStreamDefaultController.desiredSize . Он также определяет, когда вызывается метод pull() базового источника.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Методы getReader() и read()

Для чтения из читаемого потока вам понадобится программа чтения, которой будет ReadableStreamDefaultReader . Метод getReader() интерфейса ReadableStream создает средство чтения и закрепляет за ним поток. Пока поток заблокирован, никакое другое устройство чтения не может быть получено, пока не будет освобождено это.

Метод read() интерфейса ReadableStreamDefaultReader возвращает обещание, предоставляющее доступ к следующему фрагменту во внутренней очереди потока. Он выполняет или отклоняет результат в зависимости от состояния потока. Различные возможности заключаются в следующем:

  • Если чанк доступен, обещание будет выполнено с объектом вида
    { value: chunk, done: false } .
  • Если поток станет закрытым, обещание будет выполнено с помощью объекта вида
    { value: undefined, done: true } .
  • Если поток окажется ошибочным, обещание будет отклонено с соответствующей ошибкой.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked имущество

Вы можете проверить, заблокирован ли читаемый поток, обратившись к его свойству ReadableStream.locked .

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Читабельные примеры кода потока

В приведенном ниже примере кода показаны все шаги в действии. Сначала вы создаете ReadableStream , который в своем underlyingSource аргументеSource (то есть классе TimestampSource ) определяет метод start() . Этот метод сообщает controller потока enqueue() метку времени каждую секунду в течение десяти секунд. Наконец, он сообщает контроллеру о необходимости close() поток. Вы потребляете этот поток, создавая средство чтения с помощью метода getReader() и вызывая read() до тех пор, пока поток не будет done .

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Асинхронная итерация

Проверка на каждой итерации цикла read() done потока может оказаться не самым удобным API. К счастью, скоро появится лучший способ сделать это: асинхронная итерация.

for await (const chunk of stream) {
  console.log(chunk);
}

Обходной путь использования асинхронной итерации сегодня — реализация поведения с помощью полифилла.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Создание читаемого потока

Метод tee() интерфейса ReadableStream выделяет текущий читаемый поток, возвращая массив из двух элементов, содержащий две результирующие ветви в качестве новых экземпляров ReadableStream . Это позволяет двум читателям читать поток одновременно. Вы можете сделать это, например, в сервис-воркере, если хотите получить ответ с сервера и передать его в браузер, а также передать его в кэш сервис-воркера. Поскольку тело ответа не может быть использовано более одного раза, для этого вам понадобятся две копии. Чтобы отменить поток, вам необходимо отменить обе результирующие ветки. Включение потока обычно блокирует его на время, не позволяя другим читателям заблокировать его.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Читаемые потоки байтов

Для потоков, представляющих байты, предоставляется расширенная версия читаемого потока для эффективной обработки байтов, в частности, путем минимизации копий. Потоки байтов позволяют приобретать считыватели с собственным буфером (BYOB). Реализация по умолчанию может предоставлять ряд различных выходных данных, таких как строки или буферы массивов в случае WebSockets, тогда как потоки байтов гарантируют вывод байтов. Кроме того, считыватели BYOB имеют преимущества стабильности. Это связано с тем, что если буфер отсоединяется, это может гарантировать, что никто не будет записывать в один и тот же буфер дважды, что позволяет избежать состояний гонки. Читатели BYOB могут сократить количество запусков браузером сборки мусора, поскольку они могут повторно использовать буферы.

Создание читаемого потока байтов

Вы можете создать читаемый поток байтов, передав дополнительный параметр type конструктору ReadableStream() .

new ReadableStream({ type: 'bytes' });

underlyingSource

Базовому источнику читаемого потока байтов предоставляется ReadableByteStreamController для управления. Его метод ReadableByteStreamController.enqueue() принимает аргумент chunk , значением которого является ArrayBufferView . Свойство ReadableByteStreamController.byobRequest возвращает текущий запрос на извлечение BYOB или значение NULL, если его нет. Наконец, свойство ReadableByteStreamController.desiredSize возвращает желаемый размер для заполнения внутренней очереди управляемого потока.

queuingStrategy

Второй, также необязательный, аргумент конструктора ReadableStream()queuingStrategy . Это объект, который опционально определяет стратегию организации очереди для потока, который принимает один параметр:

  • highWaterMark : неотрицательное число байтов, указывающее верхнюю границу потока, использующего эту стратегию организации очереди. Это используется для определения противодавления, проявляющегося через соответствующее свойство ReadableByteStreamController.desiredSize . Он также определяет, когда вызывается метод pull() базового источника.

Методы getReader() и read()

Затем вы можете получить доступ к ReadableStreamBYOBReader соответствующим образом установив параметр mode : ReadableStream.getReader({ mode: "byob" }) . Это позволяет более точно контролировать распределение буфера, чтобы избежать копирования. Чтобы прочитать поток байтов, вам нужно вызвать ReadableStreamBYOBReader.read(view) , где view — это ArrayBufferView .

Пример кода читаемого потока байтов

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Следующая функция возвращает читаемые потоки байтов, которые позволяют эффективно читать без копирования случайно сгенерированный массив. Вместо использования заранее определенного размера фрагмента 1024 он пытается заполнить буфер, предоставленный разработчиком, обеспечивая полный контроль.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Механика записываемого потока

Доступный для записи поток — это место назначения, в которое вы можете записывать данные, представленные в JavaScript объектом WritableStream . Это служит абстракцией поверх базового приемника — приемника ввода-вывода нижнего уровня, в который записываются необработанные данные.

Данные записываются в поток через модуль записи по частям за раз. Чанк может принимать множество форм, как и чанки в программе чтения. Вы можете использовать любой код, который вам нравится, для создания фрагментов, готовых к записи; автор и связанный с ним код называются производителем .

Когда писатель создается и начинает писать в поток ( активный писатель ), говорят, что он заблокирован для него. Только один писатель может одновременно писать в записываемый поток. Если вы хотите, чтобы другой писатель начал писать в ваш поток, вам обычно необходимо освободить его, прежде чем прикреплять к нему другого писателя.

Внутренняя очередь отслеживает фрагменты, которые были записаны в поток, но еще не обработаны базовым приемником.

Стратегия организации очереди — это объект, который определяет, как поток должен сигнализировать о противодавлении на основе состояния его внутренней очереди. Стратегия организации очередей присваивает размер каждому куску и сравнивает общий размер всех кусков в очереди с указанным числом, известным как верхняя граница .

Последняя конструкция называется контроллером . С каждым доступным для записи потоком связан контроллер, который позволяет вам управлять потоком (например, прерывать его).

Создание записываемого потока

Интерфейс WritableStream API Streams предоставляет стандартную абстракцию для записи потоковых данных в пункт назначения, известный как приемник. Этот объект имеет встроенное противодавление и организацию очередей. Вы создаете записываемый поток, вызывая его конструктор WritableStream() . У него есть необязательный параметр underlyingSink , который представляет объект с методами и свойствами, которые определяют, как будет вести себя созданный экземпляр потока.

underlyingSink

underlyingSink может включать следующие необязательные методы, определенные разработчиком. Параметр controller , передаваемый некоторым методам, представляет собой WritableStreamDefaultController .

  • start(controller) : этот метод вызывается сразу после создания объекта. Содержимое этого метода должно быть направлено на получение доступа к базовому приемнику. Если этот процесс должен выполняться асинхронно, он может вернуть обещание, сигнализирующее об успехе или неудаче.
  • write(chunk, controller) : этот метод будет вызываться, когда новый фрагмент данных (указанный в параметре chunk ) готов к записи в базовый приемник. Он может вернуть обещание, сигнализирующее об успехе или неудаче операции записи. Этот метод будет вызываться только после успешного завершения предыдущих операций записи и никогда после закрытия или прерывания потока.
  • close(controller) : этот метод будет вызываться, если приложение сигнализирует о завершении записи фрагментов в поток. Содержимое должно делать все необходимое для завершения записи в базовый приемник и освобождения доступа к нему. Если этот процесс асинхронный, он может вернуть обещание, сигнализирующее об успехе или неудаче. Этот метод будет вызываться только после успешного завершения всех операций записи в очередь.
  • abort(reason) : этот метод будет вызываться, если приложение сигнализирует о желании внезапно закрыть поток и перевести его в состояние с ошибкой. Он может очищать любые удерживаемые ресурсы, как и close() , но abort() будет вызываться, даже если записи поставлены в очередь. Эти куски будут выброшены. Если этот процесс асинхронный, он может вернуть обещание, сигнализирующее об успехе или неудаче. Параметр reason содержит DOMString , описывающую, почему поток был прерван.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Интерфейс WritableStreamDefaultController API Streams представляет собой контроллер, позволяющий управлять состоянием WritableStream во время настройки, когда на запись отправляется больше фрагментов или в конце записи. При создании WritableStream базовому приемнику предоставляется соответствующий экземпляр WritableStreamDefaultController для управления. WritableStreamDefaultController имеет только один метод: WritableStreamDefaultController.error() , который приводит к ошибке при любых будущих взаимодействиях со связанным потоком. WritableStreamDefaultController также поддерживает свойство signal , которое возвращает экземпляр AbortSignal , позволяя при необходимости остановить операцию WritableStream .

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Второй, также необязательный аргумент конструктора WritableStream()queuingStrategy . Это объект, который опционально определяет стратегию организации очереди для потока, который принимает два параметра:

  • highWaterMark : неотрицательное число, указывающее верхнюю границу потока, использующего эту стратегию организации очереди.
  • size(chunk) : функция, которая вычисляет и возвращает конечный неотрицательный размер данного значения фрагмента. Результат используется для определения противодавления, проявляющегося через соответствующее свойство WritableStreamDefaultWriter.desiredSize .

Методы getWriter() и write()

Для записи в записываемый поток вам понадобится модуль записи, которым будет WritableStreamDefaultWriter . Метод getWriter() интерфейса WritableStream возвращает новый экземпляр WritableStreamDefaultWriter и привязывает поток к этому экземпляру. Пока поток заблокирован, никакой другой писатель не может быть получен до тех пор, пока не будет освобожден текущий.

Метод write() интерфейса WritableStreamDefaultWriter записывает переданный фрагмент данных в WritableStream и его базовый приемник, а затем возвращает обещание, которое разрешается, чтобы указать успех или неудачу операции записи. Обратите внимание: что означает «успех», зависит от основного приемника; это может указывать на то, что фрагмент принят, и не обязательно, что он благополучно сохранен в конечном пункте назначения.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked имущество

Вы можете проверить, заблокирован ли записываемый поток, обратившись к его свойству WritableStream.locked .

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Пример кода записываемого потока

В примере кода ниже показаны все шаги в действии.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Передача читаемого потока в записываемый поток

Читаемый поток можно передать в записываемый поток с помощью метода pipeTo() читаемого потока. ReadableStream.pipeTo() передает текущий ReadableStream в заданный WritableStream и возвращает обещание, которое выполняется при успешном завершении процесса конвейеризации или отклоняется, если обнаружены какие-либо ошибки.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Создание потока преобразования

Интерфейс TransformStream API Streams представляет собой набор преобразуемых данных. Вы создаете поток преобразования, вызывая его конструктор TransformStream() , который создает и возвращает объект потока преобразования из заданных обработчиков. Конструктор TransformStream() принимает в качестве первого аргумента необязательный объект JavaScript, представляющий transformer . Такие объекты могут содержать любой из следующих методов:

transformer

  • start(controller) : этот метод вызывается сразу после создания объекта. Обычно это используется для постановки в очередь фрагментов префикса с помощью controller.enqueue() . Эти фрагменты будут считываться с доступной для чтения стороны, но не будут зависеть от каких-либо операций записи на записываемую сторону. Если этот первоначальный процесс является асинхронным, например, потому что для получения фрагментов префикса требуются некоторые усилия, функция может вернуть обещание, чтобы сигнализировать об успехе или неудаче; отклоненное обещание вызовет ошибку в потоке. Любые выброшенные исключения будут повторно созданы конструктором TransformStream() .
  • transform(chunk, controller) : этот метод вызывается, когда новый фрагмент, изначально записанный на записываемую сторону, готов к преобразованию. Реализация потока гарантирует, что эта функция будет вызвана только после успешного завершения предыдущих преобразований и никогда до завершения start() или после flush() . Эта функция выполняет фактическую работу по преобразованию потока преобразования. Он может поставить результаты в очередь с помощью controller.enqueue() . Это позволяет одному фрагменту, записанному на записываемую сторону, привести к нулю или нескольким фрагментам на читаемой стороне, в зависимости от того, сколько раз вызывается controller.enqueue() . Если процесс преобразования является асинхронным, эта функция может вернуть обещание, сигнализирующее об успешном или неудачном преобразовании. Отклоненное обещание приведет к ошибкам как на читаемой, так и на записываемой стороне потока преобразования. Если метод transform() не указан, используется преобразование идентификаторов, которое ставит в очередь фрагменты без изменений с записываемой стороны на читаемую.
  • flush(controller) : этот метод вызывается после того, как все фрагменты, записанные на записываемую сторону, были преобразованы путем успешного прохождения transform() и записываемая сторона вот-вот будет закрыта. Обычно это используется для постановки в очередь фрагментов суффикса на читаемую сторону, прежде чем она тоже станет закрытой. Если процесс очистки является асинхронным, функция может вернуть обещание, сигнализирующее об успехе или неудаче; результат будет передан вызывающему stream.writable.write() . Кроме того, отклоненное обещание приведет к ошибкам как на читаемой, так и на записываемой стороне потока. Создание исключения рассматривается так же, как возврат отклоненного обещания.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Стратегии организации очередей writableStrategy и readableStrategy

Второй и третий необязательные параметры конструктора TransformStream() — это необязательные стратегии организации очередей writableStrategy и readableStrategy . Они определены, как указано в разделах потоков для чтения и записи соответственно.

Пример кода преобразования потока

В следующем примере кода показан простой поток преобразования в действии.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Передача читаемого потока через поток преобразования

Метод pipeThrough() интерфейса ReadableStream обеспечивает цепной способ передачи текущего потока через поток преобразования или любую другую пару, доступную для записи и чтения. Передача потока обычно блокирует его на время работы канала, не позволяя другим читателям заблокировать его.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Следующий пример кода (немного надуманный) показывает, как можно реализовать «кричащую» версию fetch() , которая переводит весь текст в верхний регистр, используя возвращаемое обещание ответа в виде потока и вводя верхний регистр по частям. Преимущество этого подхода в том, что вам не нужно ждать загрузки всего документа, что может иметь огромное значение при работе с большими файлами.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Демо

В приведенной ниже демонстрации показаны потоки, доступные для чтения, записи и преобразования, в действии. Он также включает примеры цепочек каналов pipeThrough() и pipeTo() , а также демонстрирует tee() . При желании вы можете запустить демо-версию в отдельном окне или просмотреть исходный код .

Полезные потоки, доступные в браузере

Прямо в браузер встроено несколько полезных потоков. Вы можете легко создать ReadableStream из большого двоичного объекта. Методstream () интерфейса Blob возвращает ReadableStream , который при чтении возвращает данные, содержащиеся в большом двоичном объекте. Также помните, что объект File представляет собой особый вид Blob и может использоваться в любом контексте, в котором может использоваться blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Потоковые варианты TextDecoder.decode() и TextEncoder.encode() называются TextDecoderStream и TextEncoderStream соответственно.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Сжать или распаковать файл легко с помощью потоков преобразования CompressionStream и DecompressionStream соответственно. В приведенном ниже примере кода показано, как можно загрузить спецификацию Streams, сжать ее (gzip) прямо в браузере и записать сжатый файл непосредственно на диск.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream API доступа к файловой системе и экспериментальные потоки запросов fetch() являются примерами доступных для записи потоков.

Serial API интенсивно использует как читаемые, так и записываемые потоки.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Наконец, API WebSocketStream интегрирует потоки с API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Полезные ресурсы

Благодарности

Эту статью рецензировали Джейк Арчибальд , Франсуа Бофорт , Сэм Даттон , Маттиас Бьюленс , Сурма , Джо Медли и Адам Райс . Сообщения в блоге Джейка Арчибальда очень помогли мне в понимании потоков. Некоторые примеры кода вдохновлены исследованиями пользователя GitHub @bellbind , а некоторые части текста во многом основаны на веб-документах MDN на Streams . Авторы Streams Standard проделали огромную работу по написанию этой спецификации. Героическое изображение Райана Лары на Unsplash .