Strumienie – kompleksowy przewodnik

Dowiedz się, jak używać strumieni czytelnych, zapisywalnych i przekształcanych za pomocą interfejsu Streams API.

Streams API pozwala automatycznie uzyskiwać dostęp do strumieni danych otrzymanych przez sieć lub utworzonych lokalnie i przetwarzać je za pomocą JavaScriptu. Strumieniowe przesyłanie polega na rozbiciu zasobu, który chcesz otrzymać, wysłać lub przekształcić, na małe fragmenty, a następnie przetwarzaniu tych fragmentów bit po bicie. Przesyłanie strumieniowe jest czymś, co przeglądarki robią w przypadku zasobów takich jak HTML czy filmy, które mają być wyświetlane na stronach internetowych. Jednak ta funkcja nie była nigdy dostępna dla JavaScriptu przed wprowadzeniem w 2015 r. strumieni danych.

Wcześniej, jeśli chciałeś przetworzyć jakiś zasób (np. film lub plik tekstowy), musiałeś pobrać cały plik, poczekać, aż zostanie on zdeserializowany do odpowiedniego formatu, a potem przetworzyć go. Dzięki strumieniom dostępnym w JavaScript wszystko się zmienia. Teraz możesz przetwarzać dane nieprzetworzone za pomocą JavaScriptu stopniowo, gdy tylko będą dostępne na kliencie, bez konieczności generowania bufora, ciągu znaków ani bloba. Umożliwia to wiele zastosowań, w tym:

  • Efekty wideo: przekazywanie czytelnego strumienia wideo przez strumień przekształcenia, który stosuje efekty w czasie rzeczywistym.
  • Kompresja i dekompresja danych: przekierowanie strumienia plików przez strumień transformacji, który selektywnie je kompresuje lub dekompresuje.
  • Dekodowanie obrazów: przesyłanie potoku odpowiedzi HTTP przez strumień przekształcenia, który dekoduje bajty na dane bitmapowe, a potem przez inny strumień przekształcenia, który przekształca mapy bitowe na pliki PNG. Jeśli jest zainstalowany w obiekcie fetch w ramach usługi pracującej w tle, umożliwia to przejrzyste wypełnianie nowych formatów obrazów, takich jak AVIF.

Obsługa przeglądarek

ReadableStream i WritableStream

Obsługa przeglądarek

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Źródło

TransformStream

Obsługa przeglądarek

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Źródło

Podstawowe pojęcia

Zanim omówię szczegółowo różne typy strumieni, przedstawię kilka podstawowych pojęć.

kawałki

Kawałek to pojedynczy element danych zapisywany w strumieniu lub odczytywany z niego. Może to być dowolny typ; strumienie mogą zawierać fragmenty o różnych typach. W większości przypadków fragment nie będzie najbardziej atomową jednostką danych w danym strumieniu. Na przykład strumień bajtów może zawierać fragmenty o wielkości 16 KiBUint8Array zamiast pojedynczych bajtów.

Czytelne strumienie

Czytalny strumień to źródło danych, z którego możesz odczytywać dane. Innymi słowy, dane wychodzą z czytelnego strumienia. Strumień do odczytu to konkretnie instancja klasy ReadableStream.

Strumienie z możliwością zapisu

Strumień z możliwością zapisu reprezentuje miejsce docelowe danych, do których możesz zapisywać dane. Inaczej mówiąc, dane przechodzą do strumienia do zapisu. Strumień możliwy do zapisu jest instancją klasy WritableStream.

Przekształcanie strumieni

Strumień transformacji składa się z 2 strumieni: strumienia do zapisu (czyli strony zapisu) oraz strumienia do odczytu (czyli strony odczytu). W realnym świecie metaforą tego jest tłumacz symultaniczny, który na bieżąco tłumaczy z jednego języka na inny. W sposób specyficzny dla strumienia transformacji zapisywanie danych po stronie zapisu powoduje, że nowe dane są dostępne do odczytu po stronie odczytu. Konkretnie, strumień transformacji może być reprezentowany przez dowolny obiekt z właściwością writable i właściwością readable. Standardowa klasa TransformStream ułatwia jednak tworzenie par, które są prawidłowo splecione.

Łańcuchy do rur

Strumienie są głównie używane do przesyłania ich do siebie nawzajem. Czytelny strumień można dodać bezpośrednio do strumienia możliwego do zapisu przy użyciu metody pipeTo() przeznaczonego do odczytu lub można to zrobić najpierw przez co najmniej 1 strumień przekształcenia, używając metody pipeThrough() zrozumiałego strumienia. Zbiór połączonych ze sobą strumieni nazywamy łańcuchem.

Ciśnienie wsteczne

Po utworzeniu łańcucha potoków zostaną przesłane sygnały dotyczące tego, jak szybkie fragmenty powinny przez niego przepływać. Jeśli któryś z etapów w łańcuchu nie może jeszcze przyjąć fragmentów, rozprzestrzenia on sygnał w przeciwnym kierunku przez cały łańcuch, aż do momentu, gdy pierwotne źródło otrzyma informację o zaprzestaniu tak szybkiego generowania fragmentów. Ten proces normalizowania przepływu jest nazywany obciążeniem wstecznym.

Rozgrywka

Czytelny strumień można powiązać (nazwany zgodnie z kształtem wielkiej litery „T”) przy użyciu metody tee(). Spowoduje to zablokowanie strumienia, co oznacza, że nie będzie można go używać bezpośrednio. Spowoduje to jednak utworzenie 2 nowych strumieni, zwanych gałęziami, które można odtwarzać niezależnie. Jest to ważne, ponieważ strumieni nie można cofnąć ani ponownie uruchomić. Więcej informacji na ten temat znajdziesz poniżej.

Schemat łańcucha przekierowań składającego się z strumienia czytelnego pochodzącego z wywołania interfejsu fetch API, który jest następnie przekierowywany przez strumień transformacji, którego dane wyjściowe są rozdzielane i wysyłane do przeglądarki (dla pierwszego strumienia czytelnego) oraz do pamięci podręcznej serwisu workera (dla drugiego strumienia czytelnego).
Łańcuch potoków.

Mechanika czytelnego strumienia

Czytelny strumień to źródło danych reprezentowane w JavaScriptzie przez obiekt ReadableStream, który pochodzi z podstawowego źródła. Konstruktor ReadableStream()tworzy i zwraca obiekt strumienia do odczytu na podstawie podanych modułów obsługi. Istnieją 2 rodzaje źródła:

  • Źródła push stale przesyłają dane po uzyskaniu dostępu. To od Ciebie zależy, czy chcesz rozpocząć, wstrzymać lub anulować dostęp do strumienia. Przykłady to strumienie wideo na żywo, zdarzenia wysyłane przez serwer lub WebSockets.
  • Źródła typu „pull” wymagają, aby po nawiązaniu połączenia z nimi wysłać do nich wyraźne żądanie danych. Przykłady obejmują operacje HTTP za pomocą wywołań fetch() lub XMLHttpRequest.

Dane strumienia są odczytywane sekwencyjnie w mniejszych częściach zwanych fragmentami. Fragmenty umieszczone w strumieniu są wstawiane do kolejki. Oznacza to, że są one w kolejce i czekają na odczytanie. Kolejka wewnętrzna śledzi fragmenty, które nie zostały jeszcze przeczytane.

Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować odwrotny nacisk na podstawie stanu kolejki wewnętrznej. Strategia kolejkowania przypisuje rozmiar każdemu fragmentowi i porównuje łączny rozmiar wszystkich fragmentów w kolejce z określoną liczbą, nazywaną wysokim znakiem wodnym.

Fragmenty wewnątrz strumienia są odczytywane przez czytelnika. Ten czytnik pobiera dane po kawałku, co pozwala na wykonywanie na nich dowolnych operacji. Czytnik wraz z innym kodem przetwarzania, który jest z nim powiązany, jest nazywany konsumentem.

Kolejna konstrukcja w tym kontekście nazywa się kontrolerem. Każdy czytelny strumień ma powiązany kontroler, który, jak sama nazwa wskazuje, umożliwia sterowanie strumieniem.

Tylko jeden czytnik może odczytywać strumień naraz. Gdy czytnik zostanie utworzony i zacznie odczytywać strumień (czyli stanie się aktywnym czytnikiem), zostaje zablokowany na nim. Jeśli chcesz, aby inny czytelnik przejął czytanie strumienia, musisz najpierw zwolnić pierwszego czytelnika (chociaż możesz też rozgałęzić strumień).

Tworzenie czytelnego strumienia

Aby utworzyć strumień do odczytu, wywołaj jego konstruktor: ReadableStream(). Konstruktor ma opcjonalny argument underlyingSource, który reprezentuje obiekt z metodami i właściwościami określającymi sposób zachowania utworzonej instancji strumienia.

underlyingSource

Może to odbywać się za pomocą tych opcjonalnych metod zdefiniowanych przez dewelopera:

  • start(controller): jest wywoływana natychmiast po utworzeniu obiektu. Metoda może uzyskiwać dostęp do źródła strumienia i wykonywać inne czynności wymagane do skonfigurowania funkcji strumienia. Jeśli ten proces ma być wykonywany asynchronicznie, metoda może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Parametr controller przekazany do tej metody to ReadableStreamDefaultController.
  • pull(controller): może służyć do sterowania strumieniem podczas pobierania kolejnych fragmentów. Jest ona wywoływana wielokrotnie, dopóki wewnętrzna kolejka fragmentów strumienia nie jest pełna, aż do osiągnięcia najwyższego poziomu. Jeśli wywołanie funkcji pull() spowoduje obietnicę, funkcja pull() nie zostanie wywołana ponownie, dopóki nie zostanie spełniona obietnica. Jeśli obietnica zostanie odrzucona, podczas transmisji wystąpi błąd.
  • cancel(reason): jest wywoływany, gdy konsument strumienia anuluje strumień.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController obsługuje te metody:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Drugi, również opcjonalny, argument konstruktora ReadableStream() to queuingStrategy. Jest to obiekt, który opcjonalnie definiuje kolejkowanie dla strumienia. Przyjmuje 2 parametry:

  • highWaterMark: nieujemna liczba wskazująca wysoki poziom okna transmisji używającej tej strategii kolejkowania.
  • size(chunk): funkcja, która oblicza i zwraca skończony nieujemny rozmiar danej wartości fragmentu. Wynik służy do określenia ciśnienia zwrotnego, które jest widoczne w odpowiedniej właściwości ReadableStreamDefaultController.desiredSize. Określa ona też, kiedy wywoływana jest metoda pull() źródła danych.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Metody getReader() i read()

Aby odczytać strumień do odczytu, potrzebujesz czytnika, którym będzie ReadableStreamDefaultReader. Metoda getReader() interfejsu ReadableStream tworzy czytnik i blokuje strumień. Gdy strumień jest zablokowany, nie można pozyskać żadnego innego czytnika, dopóki nie zostanie on zwolniony.

Metoda read() interfejsu ReadableStreamDefaultReader zwraca obietnicę dającą dostęp do następnego fragmentu w wewnętrznej kolejce strumienia. W zależności od stanu strumienia spełnia lub odrzuca z wynikiem. Możliwości są następujące:

  • Jeśli fragment jest dostępny, obietnica zostanie zrealizowana za pomocą obiektu formularza
    { value: chunk, done: false }.
  • Jeśli strumień zostanie zamknięty, obietnica zostanie spełniona za pomocą obiektu w formie
    { value: undefined, done: true }.
  • Jeśli strumień będzie zawierał błąd, obietnica zostanie odrzucona z odpowiednim komunikatem o błędzie.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Właściwość locked

Aby sprawdzić, czy strumień do odczytu jest zablokowany, otwórz obiekt ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

czytelne przykłady kodu strumienia danych.

Przykładowy kod poniżej pokazuje wszystkie te kroki w akcji. Najpierw tworzysz obiekt ReadableStream, który w argumencie underlyingSource (czyli klasa TimestampSource) definiuje metodę start(). Ta metoda informuje controller strumienia, aby co sekundę przez 10 sekund dodawał sygnaturę czasową do enqueue(). Na koniec informuje kontroler, że ma close() strumień. Aby wykorzystać ten strumień, utwórz czytnik za pomocą metody getReader() i wywołuj metodę read(), dopóki strumień nie będzie done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iteracja asynchroniczna

Sprawdzanie w każdej iteracji pętli read(), czy strumień jest done, może nie być najwygodniejszym interfejsem API. Na szczęście wkrótce istnieje lepszy sposób wykonywania tych czynności: iteracja asynchroniczna.

for await (const chunk of stream) {
  console.log(chunk);
}

Aby obecnie korzystać z iteracji asynchronicznej, możesz zaimplementować to zachowanie za pomocą polyfilla.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Odczytywanie strumienia danych

Metoda tee() interfejsu ReadableStream rozdziela bieżący odczytywalny strumień, zwracając tablicę o 2 elementach zawierającą 2 wynikowe gałęzie jako nowe instancje ReadableStream. Dzięki temu 2 czytelnicy mogą jednocześnie czytać strumień. Możesz to zrobić na przykład w skrypcie service worker, jeśli chcesz pobrać odpowiedź z serwera i przesłać ją strumieniowo do przeglądarki, ale też do pamięci podręcznej skryptu service worker. Ponieważ treści odpowiedzi nie można wykorzystać więcej niż raz, potrzebne są 2 kopie. Aby anulować strumień, musisz anulować oba powstałe rozgałęzienia. Ogólnie rzecz biorąc, strumień zostanie zablokowany na cały czas, przez co inni czytelnicy nie zablokują go.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Czytelne strumienie bajtów

W przypadku strumieni reprezentujących bajty udostępniana jest rozszerzona wersja czytelnego strumienia, która umożliwia efektywne przetwarzanie bajtów, w szczególności poprzez minimalizowanie kopii. Strumienie bajtów umożliwiają nabywanie czytników typu „przynieś własną pamięć podręczną” (BYOB). Domyślna implementacja może dawać różne dane wyjściowe, takie jak ciągi znaków lub tablice buforów w przypadku WebSockets, podczas gdy strumienie bajtów gwarantują dane wyjściowe w postaci bajtów. Oprócz tego czytelnicy BYOB mają korzyści ze stabilności. Dzieje się tak, ponieważ odłączenie bufora może zagwarantować, że nie będzie on dwa razy zapisywany w tym samym buforze, co pozwoli uniknąć sytuacji wyścigu. Czytniki BYOB mogą zmniejszyć liczbę razy, ile przeglądarka musi wykonać zbieranie elementów usuniętych z pamięci, ponieważ mogą ponownie używać buforów.

Tworzenie czytelnego strumienia bajtów

Aby utworzyć czytelny strumień bajtów, możesz przekazać dodatkowy parametr type do konstruktora ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Źródło strumienia bajtów do odczytu ma uprawnienia ReadableByteStreamController do manipulowania. Metoda ReadableByteStreamController.enqueue() przyjmuje argument chunk, którego wartość jest typu ArrayBufferView. Właściwość ReadableByteStreamController.byobRequest zwraca bieżące żądanie pull BYOB lub wartość null, jeśli nie ma żadnego. Wreszcie właściwość ReadableByteStreamController.desiredSize zwraca pożądany rozmiar, aby wypełnić kolejkę wewnętrzną kontrolowanego strumienia.

queuingStrategy

Drugim, podobnie jak opcjonalnym, argumentem konstruktora ReadableStream() jest queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia. Przyjmuje on jeden parametr:

  • highWaterMark: nieujemna liczba bajtów wskazująca wysoki poziom strumienia przy użyciu tej strategii kolejkowania. Służy to określeniu ciśnienia wstecznego, które jest widoczne w odpowiedniej właściwości ReadableByteStreamController.desiredSize. Określa ona też, kiedy wywoływana jest metoda pull() źródła danych.

metody getReader()read();

Następnie możesz uzyskać dostęp do ReadableStreamBYOBReader, odpowiednio ustawiając parametr mode: ReadableStream.getReader({ mode: "byob" }). Umożliwia to dokładniejsze kontrolowanie przydziału bufora, aby uniknąć kopiowania. Aby odczytać dane z potoku bajtów, musisz wywołać funkcję ReadableStreamBYOBReader.read(view), gdzie view to ArrayBufferView.

Przykładowy kod odczytujalnego strumienia bajtów

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Podana niżej funkcja zwraca czytelne strumienie bajtów, które umożliwiają wydajne odczytywanie tablicy wygenerowanej losowo bez kopiowania. Zamiast używać z góry określonego rozmiaru fragmentu (1024), próbuje wypełnić bufor dostarczony przez dewelopera, co zapewnia pełną kontrolę.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mechanika strumienia z możliwością zapisu

Możliwość zapisu w strumieniu to miejsce docelowe, do którego możesz zapisywać dane reprezentowane w JavaScriptzie przez obiekt WritableStream. Stanowi ona abstrakcję na szczycie podstawy odbiornika, czyli odbiornika we/wy na niższym poziomie, do którego zapisywane są dane nieprzetworzone.

Dane są zapisywane w strumieniach za pomocą programu do zapisywania, po jednym kawałku naraz. Kawałek może przybierać wiele form, tak jak kawałki w czytniku. Do wygenerowania fragmentów gotowych do zapisania możesz użyć dowolnego kodu. Pisarz wraz z powiązanym kodem to producent.

Gdy nowy pisarz zaczyna pisać do strumienia (aktywny pisarz), mówi się, że jest zablokowany. Do strumienia do zapisu może pisać tylko 1 proces. Jeśli chcesz, aby inny pisarz zaczął pisać do Twojego strumienia, musisz go najpierw uwolnić, a potem dołączyć do niego innego pisarza.

Wewnętrzna kolejka zawiera informacje o fragmentach, które zostały zapisane w strumieniu, ale jeszcze nie zostały przetworzone przez ujście.

Strategia kolejkowania to obiekt określający sposób sygnalizowania obciążenia wstecznego strumienia na podstawie stanu jego wewnętrznej kolejki. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce ze wskazaną liczbą, zwaną wysoką wartością graniczną.

Ostateczna konstrukcja jest nazywana kontrolerem. Każdy strumień z możliwością zapisu ma powiązany kontroler, który umożliwia Ci sterowanie strumieniem (np. jego przerwanie).

Tworzenie strumienia z możliwością zapisu

Interfejs WritableStream interfejsu Streams API zapewnia standardową abstrakcję umożliwiającą zapisywanie danych strumieniowych w miejscu docelowym, zwanym odbiornikiem. Ten obiekt ma wbudowane zabezpieczenie przed przeciążeniem i kolejkowanie. Strumień do zapisu tworzysz, wywołując jego konstruktor: WritableStream(). Ma on opcjonalny parametr underlyingSink, który reprezentuje obiekt z metodami i właściwościami określającymi sposób działania utworzonej instancji strumienia.

underlyingSink

underlyingSink może zawierać te opcjonalne metody zdefiniowane przez dewelopera. Parametr controller przekazywany do niektórych metod jest zmienną typu WritableStreamDefaultController.

  • start(controller): ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zawartość tej metody powinna umożliwiać uzyskanie dostępu do podstawowego odbiornika. Jeśli ten proces ma zostać wykonany asynchronicznie, może zwrócić obietnicę sukcesu lub niepowodzenia.
  • write(chunk, controller): ta metoda zostanie wywołana, gdy nowy fragment danych (określony w parametrze chunk) będzie gotowy do zapisania w podstawowym odbiorniku. Może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie operacji zapisu. Ta metoda jest wywoływana dopiero po pomyślnym zakończeniu wcześniejszych zapisów i nie po zamknięciu lub przerwaniu strumienia.
  • close(controller): ta metoda zostanie wywołana, jeśli aplikacja sygnalizuje, że zakończyła zapisywanie fragmentów do strumienia. Treści powinny zrobić wszystko, co jest konieczne, aby dokończyć zapisywanie do podstawowego odbiornika i udostępnić do niego dostęp. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę sukcesu lub niepowodzenia. Ta metoda jest wywoływana dopiero po pomyślnym zakończeniu wszystkich zapisów w kolejce.
  • abort(reason): ta metoda jest wywoływana, gdy aplikacja zasygnalizuje, że chce nagle zamknąć strumień i wprowadzić w niej błąd. Może ona wyczyścić wszystkie zablokowane zasoby, podobnie jak close(), ale metoda abort() zostanie wywołana nawet wtedy, gdy operacje zapisu są w kolejce. Te fragmenty zostaną odrzucone. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Parametr reason zawiera wartość DOMString, która opisuje, dlaczego transmisja została przerwana.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Interfejs interfejsu Streams API WritableStreamDefaultController reprezentuje kontroler, który umożliwia kontrolowanie stanu WritableStream podczas konfigurowania, gdy przesyłane są kolejne fragmenty do zapisu lub po zakończeniu zapisu. Podczas tworzenia obiektu WritableStream bazowe ujście otrzymuje odpowiednie wystąpienie WritableStreamDefaultController do manipulowania. WritableStreamDefaultController ma tylko jedną metodę: WritableStreamDefaultController.error(), co powoduje, że wszelkie przyszłe interakcje z powiązanym strumieniem powodują błąd. Obiekt WritableStreamDefaultController obsługuje też właściwość signal, która zwraca instancję AbortSignal, co umożliwia zatrzymanie operacji WritableStream w razie potrzeby.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Drugi, również opcjonalny, argument konstruktora WritableStream() to queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia. Przyjmuje ona 2 parametry:

  • highWaterMark: nieujemna liczba wskazująca wysoki poziom okna transmisji używającej tej strategii kolejkowania.
  • size(chunk): funkcja, która oblicza i zwraca skończony, nieujemny rozmiar danej wartości fragmentu. Wynik służy do określania naprężenia wstecznego, które pojawia się za pomocą odpowiedniej właściwości WritableStreamDefaultWriter.desiredSize.

metody getWriter()write();

Aby pisać w strumieniu z możliwością zapisu, musisz mieć scenarzystę, którym będzie WritableStreamDefaultWriter. Metoda getWriter() interfejsu WritableStream zwraca nową instancję WritableStreamDefaultWriter i blokuje strumień w tej instancji. Gdy strumień jest zablokowany, nie można pozyskać innego autora, dopóki bieżący nie zostanie zwolniony.

Metoda write() interfejsu WritableStreamDefaultWriter zapisuje przekazany fragment danych w obiekcie WritableStream i jego docelowym odbiorniku, a potem zwraca obietnicę, która wskazuje na powodzenie lub niepowodzenie operacji zapisu. Pamiętaj, że to, co oznacza sukces, zależy od ujścia źródła. Może to oznaczać, że fragment został zaakceptowany, a niekoniecznie, że został bezpiecznie zapisany w ostatecznym miejscu docelowym.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Właściwość locked

Aby sprawdzić, czy strumień z możliwością zapisu jest zablokowany, otwórz jego właściwość WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Przykład kodu strumienia z możliwością zapisu

Przykładowy kod poniżej pokazuje wszystkie kroki.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Przypisanie czytelnego strumienia do strumienia z możliwością zapisu

Strumień do odczytu można przekierować do strumienia do zapisu za pomocą metody pipeTo() strumienia do odczytu. ReadableStream.pipeTo() przekształca wartość ReadableStream w dany WritableStream i zwraca obietnicę, która zostanie zrealizowana po pomyślnym zakończeniu procesu potoku, lub odrzuca, jeśli wystąpią błędy.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Tworzenie strumienia przekształcenia

Interfejs TransformStream interfejsu Streams API reprezentuje zestaw danych, które można przekształcać. Strumień przekształcenia możesz utworzyć, wywołując jego konstruktor TransformStream(), który tworzy i zwraca obiekt strumienia przekształcenia z podanych modułów obsługi. Konstruktor TransformStream() przyjmuje jako pierwszy argument opcjonalny obiekt JavaScript reprezentujący transformer. Takie obiekty mogą zawierać dowolną z tych metod:

transformer

  • start(controller): ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zwykle służy to do umieszczania w kole fragmentów prefiksu za pomocą parametru controller.enqueue(). Te fragmenty będą odczytywane z czytalnej strony, ale nie będą zależeć od zapisów na stronie z możliwością zapisu. Jeśli ten początkowy proces jest asynchroniczny, na przykład dlatego, że wymaga pewnych działań, aby pobrać fragmenty prefiksu, funkcja może zwrócić obietnicę, aby zasygnalizować sukces lub niepowodzenie. Odrzucona obietnica spowoduje błąd strumienia. Konstruktor TransformStream() ponownie rzuca wszystkie wyjątki.
  • transform(chunk, controller): ta metoda jest wywoływana, gdy nowy fragment zapisany pierwotnie po stronie zapisu jest gotowy do przekształcenia. Implementacja strumienia gwarantuje, że ta funkcja będzie wywoływana tylko po pomyślnym przekształceniu poprzednich elementów i nigdy przed zakończeniem działania funkcji start() ani po wywołaniu funkcji flush(). Ta funkcja wykonuje rzeczywistą przekształcenie strumienia przekształcenia. Może ona umieszczać wyniki w kolejce za pomocą funkcji controller.enqueue(). Dzięki temu pojedynczy fragment zapisany po stronie zapisu może spowodować, że po stronie odczytu nie będzie żadnych fragmentów lub będzie ich kilka, w zależności od tego, ile razy zostanie wywołana funkcja controller.enqueue(). Jeśli proces przekształcania jest asynchroniczny, ta funkcja może zwrócić obietnicę sukcesu lub niepowodzenia przekształcenia. Odrzucone obietnice spowodują błąd zarówno po stronie odczytu, jak i zapisu strumienia transformacji. Jeśli nie podano metody transform(), używane jest przekształcenie tożsamości, które dodaje do kolejki fragmenty bez zmian ze strony możliwej do zapisu na czytelną.
  • flush(controller): ta metoda jest wywoływana po tym, jak wszystkie fragmenty zapisane po stronie z możliwością zapisu zostały przekształcone przez udane przekazanie przez transform(), a strona z możliwością zapisu ma zostać zamknięta. Zwykle służy to do umieszczania w kolejce fragmentów sufiksów po stronie czytelniczej, zanim zostanie ona zamknięta. Jeśli proces opróżniania jest asynchroniczny, funkcja może zwrócić obietnicę sukcesu lub niepowodzenia. Wynik zostanie przekazany do elementu wywołującego stream.writable.write(). Dodatkowo odrzucona obietnica powoduje błędy zarówno w czytelnej, jak i dostępnej do zapisu strony strumienia. Wyjątek jest traktowany tak samo jak odrzucenie obietnicy.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategie kolejkowania: writableStrategy i readableStrategy

Drugi i trzeci opcjonalny parametr konstruktora TransformStream() to odpowiednio strategie kolejkowania writableStrategyreadableStrategy. Są one zdefiniowane odpowiednio w sekcjach strumienia do odczytu i zapisu.

Przykładowy kod strumienia przekształcania

Poniższy przykładowy kod pokazuje działanie prostego przekształcenia strumienia.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Przesyłanie czytelnego strumienia przez strumień transformacji

Metoda pipeThrough() interfejsu ReadableStream umożliwia łańcuchowe przesyłanie bieżącego strumienia przez strumień przekształcenia lub dowolną inną parę zapisu/odczytu. Zastosowanie potoku powoduje zwykle zablokowanie go na czas jego trwania, przez co inni czytelnicy nie mogą go zablokować.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Następny przykładowy kod (nieco nieco wymyślony) pokazuje, jak można wdrożyć wersję fetch() z „krzyczącymi” tekstami, która zamienia cały tekst na wielkie litery, przy użyciu zwróconej obietnicy odpowiedzi jako strumienia i fragmentu po fragmencie wielką literą. Zaletą tego podejścia jest to, że nie musisz czekać na pobranie całego dokumentu, co może mieć ogromne znaczenie w przypadku dużych plików.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Prezentacja

Demonstracja poniżej pokazuje działanie strumieni czytalnych, zapisywalnych i przekształcających. Zawiera też przykłady pipeThrough() i pipeTo() łańcuchów rur oraz demonstruje tee(). Opcjonalnie możesz uruchomić demonstrację w osobnym oknie lub wyświetlić kod źródłowy.

Przydatne strumienie dostępne w przeglądarce

W przeglądarce jest wiele przydatnych strumieni. Możesz łatwo utworzyć zbiórReadableStream z bloba. Metoda stream() interfejsu Blob zwraca ReadableStream, który po odczycie zwraca dane zawarte w blobie. Pamiętaj też, że obiekt File to konkretny rodzaj obiektu Blob i można go użyć w dowolnym kontekście, w którym może działać obiekt blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Wersje strumieniowe TextDecoder.decode() i TextEncoder.encode() nazywają się odpowiednio TextDecoderStream i TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Kompresowanie i rozpakowywanie plików jest proste dzięki strumieniom transformacji CompressionStreamDecompressionStream. Poniższy przykładowy kod pokazuje, jak pobrać specyfikację strumieni, skompresować (gzip) ją w przeglądarce i zapisać skompresowany plik bezpośrednio na dysku.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Interfejs File System Access API FileSystemWritableFileStream i eksperymentalne strumień zapytań fetch() to przykłady strumienka z możliwością zapisu w praktyce.

Interfejs Serial API intensywnie korzysta zarówno ze strumieni do odczytu, jak i do zapisu.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

I wreszcie interfejs API WebSocketStream integruje strumienie z interfejsem WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Przydatne materiały

Podziękowania

Ten artykuł został sprawdzony przez Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley i Adam Rice. Artykuły w blogu Jake'a Archibalda bardzo mi pomogły w zrozumieniu strumieni danych. Niektóre przykłady kodu zostały zainspirowane eksperymentami użytkownika GitHuba @bellbind, a część tekstu opiera się na dokumentacji MDN na temat strumieni danych. Autorzy standardu Streams Standard wykonali świetną pracę, tworząc te specyfikacje. Obraz główny pochodzi od Ryana Lary z Unsplash.