Strumienie – kompleksowy przewodnik

Dowiedz się, jak używać strumieni odczytu, zapisu i przekształcania za pomocą interfejsu Streams API.

Interfejs Streams API umożliwia programowy dostęp do strumieni danych otrzymanych przez sieć lub utworzonych lokalnie za pomocą dowolnych środków, a także ich przetwarzanie za pomocą kodu JavaScript. Strumieniowanie polega na rozbiciu zasobu, który chcesz otrzymać, wysłać lub przekształcić, na małe fragmenty, a następnie przetwarzaniu tych fragmentów bit po bicie. Chociaż strumieniowanie jest czymś, co przeglądarki robią, gdy otrzymują zasoby, takie jak HTML lub filmy, aby wyświetlać je na stronach internetowych, ta funkcja nigdy nie była dostępna dla JavaScriptu przed wprowadzeniem w 2015 r. strumieniowania.fetch

Wcześniej, jeśli chciałeś przetworzyć jakiś zasób (np. film lub plik tekstowy), musiałeś pobrać cały plik, poczekać, aż zostanie on zdeserializowany do odpowiedniego formatu, a potem przetworzyć go. Wszystko się zmienia, gdy strumienie są dostępne dla JavaScript. Teraz możesz przetwarzać dane nieprzetworzone za pomocą JavaScriptu stopniowo, gdy tylko będą dostępne na kliencie, bez konieczności generowania bufora, ciągu znaków ani bloba. Umożliwia to wiele zastosowań, w tym:

  • Efekty wideo: przesyłanie czytelnego strumienia wideo przez strumień transformacji, który stosuje efekty w czasie rzeczywistym.
  • (De)kompresowanie danych: przekierowanie strumienia plików przez strumień transformacji, który selektywnie (de)kompresuje dane.
  • Dekodowanie obrazu: przekierowanie strumienia odpowiedzi HTTP przez strumień transformacji, który dekoduje bajty na dane bitmapy, a następnie przez inny strumień transformacji, który przekształca bitmapy w pliki PNG. Jeśli jest zainstalowany w obiekcie fetch w ramach usługi, umożliwia to przejrzyste wypełnianie nowych formatów obrazów, takich jak AVIF.

Obsługa przeglądarek

ReadableStream i WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

Podstawowe pojęcia

Zanim omówię szczegółowo różne typy strumieni, przedstawię kilka podstawowych pojęć.

Chunks

Kawałek to pojedynczy element danych zapisywany w strumieniu lub z niego odczytywany. Może to być dowolny typ; strumienie mogą zawierać fragmenty różnych typów. W większości przypadków fragment nie będzie najbardziej elementarną jednostką danych w danym strumieniu. Na przykład strumień bajtów może zawierać fragmenty o wielkości 16 KiBUint8Array zamiast pojedynczych bajtów.

Czytelne strumienie

Czytalny strumień to źródło danych, z którego możesz odczytywać dane. Innymi słowy, dane wychodzą z czytelnego strumienia. Strumień tekstowy to konkretnie instancja klasy ReadableStream.

Strumienie z możliwością zapisu

Możliwość zapisu w strumieniu oznacza, że jest to miejsce docelowe danych, do których możesz zapisywać. Inaczej mówiąc, dane przechodzą do strumienia do zapisu. Strumień do zapisu to instancja klasy WritableStream.

Przekształcanie strumieni

Strumień transformacji składa się z 2 strumieni: strumienia do zapisu (zwanego stroną do zapisu) i strumienia do odczytu (zwanego stroną do odczytu). W realnym świecie metaforą tego jest tłumacz symultaniczny, który na bieżąco tłumaczy z jednego języka na inny. W sposób specyficzny dla strumienia transformacji zapisywanie na stronie z możliwością zapisu powoduje, że nowe dane są dostępne do odczytu po stronie z możliwością odczytu. Konkretnie dowolny obiekt z właściwością writable i właściwością readable może służyć jako strumień transformacji. Standardowa klasa TransformStream ułatwia jednak tworzenie par, które są prawidłowo splecione.

Łańcuchy do rur

Strumienie są głównie używane do przesyłania ich do siebie nawzajem. Strumień do odczytu można przekierować bezpośrednio do strumienia do zapisu, używając metody pipeTo() strumienia do odczytu. Można go też przekierować przez jeden lub więcej strumieni transformacji, używając metody pipeThrough() strumienia do odczytu. Zbiór połączonych strumieni nazywamy łańcuchem.

Ciśnienie wsteczne

Gdy łańcuch przewodów zostanie utworzony, będzie rozpowszechniać sygnały dotyczące tego, jak szybko powinny przepływać przez niego elementy. Jeśli którykolwiek z etapów w łańcuchu nie może jeszcze przyjąć fragmentów, rozprzestrzenia on sygnał w przeciwnym kierunku przez cały łańcuch, aż do momentu, gdy pierwotne źródło otrzyma informację o zaprzestaniu tak szybkiego generowania fragmentów. Ten proces normalizacji przepływu nazywa się odwrotnym ciśnieniem.

Teeing

Przepływ danych do odczytu może być rozdzielany (nazwa pochodzi od kształtu litery „T” w wielkiej literze) za pomocą metody tee(). Spowoduje to zablokowanie strumienia, co oznacza, że nie będzie można go używać bezpośrednio. Spowoduje to jednak utworzenie 2 nowych strumieni, zwanych gałęziami, które można odtwarzać niezależnie. Ważne jest też to, że strumieni nie można cofnąć ani ponownie uruchomić. Więcej informacji na ten temat znajdziesz poniżej.

Schemat łańcucha przekierowań składającego się z czytelnego strumienia pochodzącego z wywołania interfejsu fetch API, który jest następnie przekierowywany przez strumień transformacji, którego dane wyjściowe są rozdzielane i wysyłane do przeglądarki (dla pierwszego wynikowego strumienia czytelnego) oraz do pamięci podręcznej serwisu workera (dla drugiego wynikowego strumienia czytelnego).
Potok w łańcuchu.

Mechanika czytelnego strumienia

Czytelny strumień to źródło danych reprezentowane w JavaScriptie przez obiekt ReadableStream, który pochodzi z podstawowego źródła. Konstruktor ReadableStream()tworzy i zwraca obiekt strumienia do odczytu na podstawie podanych modułów obsługi. Istnieją 2 rodzaje źródeł danych:

  • Źródła push przesyłają dane do Ciebie, gdy tylko uzyskasz do nich dostęp. To od Ciebie zależy, czy chcesz rozpocząć, wstrzymać lub anulować dostęp do strumienia. Przykłady to strumienie wideo na żywo, zdarzenia wysyłane przez serwer lub WebSockets.
  • Źródła danych typu „pull” wymagają, aby po nawiązaniu połączenia z nimi wysłać do nich wyraźne żądanie danych. Przykłady obejmują operacje HTTP za pomocą wywołań fetch() lub XMLHttpRequest.

Dane strumienia są odczytywane sekwencyjnie w mniejszych częściach zwanych fragmentami. Fragmenty umieszczone w strumieniu są wstawiane do kolejki. Oznacza to, że są one w kolejce i gotowe do odczytania. Kolejka wewnętrzna śledzi fragmenty, które nie zostały jeszcze przeczytane.

Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować odwrotny nacisk na podstawie stanu swojej kolejki wewnętrznej. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce ze wskazaną liczbą, zwaną wysoką wartością graniczną.

Fragmenty w strumieniu są odczytywane przez czytnik. Ten czytnik pobiera dane po kawałku, co pozwala na wykonywanie na nich dowolnych operacji. Czytnik wraz z innym kodem przetwarzania, który jest z nim powiązany, jest nazywany konsumentem.

Kolejna konstrukcja w tym kontekście nazywa się kontroler. Każdy odczytywalny strumień ma powiązany kontroler, który, jak sama nazwa wskazuje, umożliwia sterowanie tym strumieniem.

Strumień może odczytywać tylko 1 czytnik naraz. Gdy czytnik zostanie utworzony i zacznie odczytywać strumień (czyli stanie się aktywnym czytnikiem), zostaje zablokowany. Jeśli chcesz, aby inny czytelnik przejął czytanie strumienia, musisz najpierw zwolnić pierwszego czytelnika (chociaż możesz rozgałęzić strumień).

Tworzenie czytelnego strumienia

Aby utworzyć strumień do odczytu, wywołaj jego konstruktor: ReadableStream(). Konstruktor ma opcjonalny argument underlyingSource, który reprezentuje obiekt z metodami i właściwościami definiującymi zachowanie utworzonej instancji strumienia.

underlyingSource

Może to odbywać się za pomocą tych opcjonalnych metod zdefiniowanych przez deweloperów:

  • start(controller): wywoływana natychmiast po utworzeniu obiektu. Metoda może uzyskać dostęp do źródła strumienia i wykonywać inne czynności wymagane do skonfigurowania funkcji strumienia. Jeśli ten proces ma być wykonywany asynchronicznie, metoda może zwracać obietnicę, aby zasygnalizować sukces lub niepowodzenie. Parametr controller przekazany do tej metody to a ReadableStreamDefaultController.
  • pull(controller): można go używać do kontrolowania strumienia, gdy pobierane są kolejne fragmenty. Jest on wywoływany wielokrotnie, dopóki wewnętrzna kolejka fragmentów strumienia nie jest pełna, aż do osiągnięcia najwyższego poziomu. Jeśli wywołanie funkcji pull() zwróci obietnicę, funkcja pull() nie zostanie wywołana ponownie, dopóki nie zostanie spełniona obietnicą. Jeśli obietnica zostanie odrzucona, strumień będzie zawierał błąd.
  • cancel(reason): jest wywoływany, gdy odbiorca strumienia anuluje strumień.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController obsługuje te metody:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Drugi, również opcjonalny, argument konstruktora ReadableStream() to queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania dla strumienia. Przyjmuje on 2 parametry:

  • highWaterMark: nieujemna liczba wskazująca wysoki poziom ścieżki audio, która korzysta z tej strategii kolejkowania.
  • size(chunk): funkcja, która oblicza i zwraca skończony nieujemny rozmiar danej wartości fragmentu. Wynik służy do określania ciśnienia zwrotnego, które jest widoczne w odpowiedniej właściwości ReadableStreamDefaultController.desiredSize. Określa ona też, kiedy wywoływana jest metoda pull() źródła danych.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Metody getReader()read()

Aby odczytać strumień do odczytu, potrzebujesz czytnika, którym będzie ReadableStreamDefaultReader. Metoda getReader() interfejsu ReadableStream tworzy czytnik i blokuje strumień. Podczas blokowania strumienia żaden inny czytnik nie może zostać pozyskany, dopóki ten nie zostanie zwolniony.

Metoda read() interfejsu ReadableStreamDefaultReader zwraca obietnicę zapewniającą dostęp do następnego fragmentu w wewnętrznej kolejce strumienia. W zależności od stanu strumienia może ona spełnić lub odrzucić dane. Możliwości są następujące:

  • Jeśli fragment jest dostępny, obietnica zostanie spełniona za pomocą obiektu w formie
    { value: chunk, done: false }.
  • Jeśli strumień zostanie zamknięty, obietnica zostanie spełniona za pomocą obiektu w formie
    { value: undefined, done: true }.
  • Jeśli strumień będzie zawierał błąd, obietnica zostanie odrzucona z odpowiednim komunikatem o błędzie.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Właściwość locked

Aby sprawdzić, czy strumień do odczytu jest zablokowany, otwórz obiekt ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Czytelne przykłady kodu strumienia

Przykładowy kod poniżej pokazuje wszystkie te kroki w akcji. Najpierw tworzysz klasę ReadableStream, która w swoim argumencie underlyingSource (czyli klasie TimestampSource) definiuje metodę start(). Ta metoda informuje controller strumienia, abyenqueue() dodawał sygnaturę czasową co sekundę przez 10 sekund. Na koniec przekazuje kontrolerowi polecenie close() strumienia. Odczytujesz ten strumień, tworząc czytnik za pomocą metody getReader() i wywołując read(), aż strumień będzie done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iteracja asynchroniczna

Sprawdzanie w każdej iteracji pętli read(), czy strumień jest done, może nie być najwygodniejszym interfejsem API. Na szczęście wkrótce pojawi się lepszy sposób: iteracja asynchroniczna.

for await (const chunk of stream) {
  console.log(chunk);
}

Aby obecnie korzystać z iteracji asynchronicznej, należy zaimplementować to zachowanie za pomocą polyfilla.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Odgałęzienie strumienia do odczytu

Metoda tee() interfejsu ReadableStream rozdziela bieżący odczytywalny strumień, zwracając tablicę o 2 elementach zawierającą 2 wynikowe gałęzie jako nowe instancje ReadableStream. Dzięki temu 2 czytelnicy mogą jednocześnie czytać ten sam strumień. Możesz to zrobić na przykład w skrypcie service worker, jeśli chcesz pobrać odpowiedź z serwera i przesłać ją strumieniowo do przeglądarki, a także do pamięci podręcznej skryptu service worker. Ponieważ treści odpowiedzi nie można wykorzystać więcej niż raz, musisz utworzyć 2 kopie. Aby anulować strumień, musisz anulować oba powstałe rozgałęzienia. Przekierowanie strumienia zablokuje go na czas trwania, uniemożliwiając innym użytkownikom zablokowanie go.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

czytelne strumienie bajtów;

W przypadku strumieni reprezentujących bajty udostępniana jest rozszerzona wersja czytelnego strumienia, która umożliwia efektywne przetwarzanie bajtów, w szczególności poprzez minimalizowanie kopii. Strumienie bajtów umożliwiają nabywanie czytników typu „przynieś własną pamięć podręczną” (BYOB). Domyślna implementacja może dawać różne dane wyjściowe, np. ciągi znaków lub tablice buforów w przypadku WebSockets, podczas gdy strumienie bajtów gwarantują dane wyjściowe w postaci bajtów. Ponadto czytelnicy mogą korzystać z dodatkowych funkcji stabilności. Jeśli bufor się odłączy, może to spowodować, że nie zapiszesz dwukrotnie do tego samego bufora, co zapobiegnie sytuacjom wyścigu. Czytniki BYOB mogą zmniejszyć liczbę wywołań funkcji zbierania elementów usuniętych z pamięci przez przeglądarkę, ponieważ mogą ponownie używać buforów.

Tworzenie czytelnego strumienia bajtów

Możesz utworzyć czytelny strumień bajtów, przekazując dodatkowy parametr type do konstruktora ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Źródło danych strumienia bajtów do odczytu otrzymuje uprawnienia ReadableByteStreamController do manipulowania. Metoda ReadableByteStreamController.enqueue() przyjmuje argument chunk, którego wartość jest typu ArrayBufferView. Właściwość ReadableByteStreamController.byobRequest zwraca bieżącą prośbę o przeniesienie do repozytorium BYOB lub wartość null, jeśli nie ma takiej prośby. Wreszcie właściwość ReadableByteStreamController.desiredSize zwraca pożądany rozmiar, aby wypełnić kolejkę wewnętrzną kontrolowanego strumienia.

queuingStrategy

Drugi, również opcjonalny, argument konstruktora ReadableStream() to queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania dla strumienia. Przyjmuje on jeden parametr:

  • highWaterMark: nieujemna liczba bajtów wskazująca wysoki poziom strumienia korzystającego z tej strategii kolejkowania. Służy on do określania ciśnienia wstecznego, które jest widoczne w odpowiedniej właściwości ReadableByteStreamController.desiredSize. Określa ona też, kiedy wywoływana jest metoda pull() źródła danych.

Metody getReader()read()

Następnie możesz uzyskać dostęp do ReadableStreamBYOBReader, odpowiednio ustawiając parametr mode:ReadableStream.getReader({ mode: "byob" }). Umożliwia to dokładniejszą kontrolę alokacji bufora, aby uniknąć kopiowania. Aby odczytać dane z strumienia bajtów, musisz wywołać funkcję ReadableStreamBYOBReader.read(view), gdzie view to ArrayBufferView.

Przykładowy kod strumienia bajtów w czytelnej postaci

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Podana niżej funkcja zwraca czytelne strumienie bajtów, które umożliwiają wydajne odczytywanie losowo wygenerowanej tablicy bez kopiowania. Zamiast używać z góry określonego rozmiaru fragmentu (1024), próbuje wypełnić bufor dostarczony przez dewelopera, co zapewnia pełną kontrolę.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mechanika strumienia umożliwiającego zapis

Możliwość zapisu w strumieniach to miejsce docelowe, do którego możesz zapisywać dane. W języku JavaScript reprezentuje je obiekt WritableStream. Jest to abstrakcja na szczycie podstawy odbiornika – odbiornika we/wy na niższym poziomie, do którego zapisywane są dane nieprzetworzone.

Dane są zapisywane w strumieniach za pomocą programu do zapisu, po jednym fragmencie na raz. Kawałek może przybierać wiele form, tak jak kawałki w czytniku. Do wygenerowania fragmentów gotowych do zapisania możesz użyć dowolnego kodu. Pisarz wraz z powiązanym kodem to producent.

Gdy nowy skrypt zostanie utworzony i zacznie zapisywać dane do strumienia (aktywny skrypt zapisuje dane), mówimy, że jest zablokowany na potrzeby tego strumienia. Do strumienia do zapisu może pisać tylko 1 program naraz. Jeśli chcesz, aby inny pisarz zaczął pisać do Twojego strumienia, musisz go najpierw uwolnić, a potem dołączyć do niego innego pisarza.

Kolejka wewnętrzna śledzi fragmenty zapisane w strumieniach, które nie zostały jeszcze przetworzone przez docelowy odbiornik.

Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować odwrotny nacisk na podstawie stanu swojej kolejki wewnętrznej. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce ze wskazaną liczbą, zwaną wysoką wartością graniczną.

Ostateczna konstrukcja nazywa się kontrolerem. Każdy strumień do zapisu ma powiązany kontroler, który umożliwia sterowanie tym strumieniem (np. jego przerwanie).

Tworzenie strumienia do zapisu

Interfejs WritableStream interfejsu Streams API zapewnia standardową abstrakcję do zapisywania danych strumieniowych w miejscu docelowym, zwanym odbiornikiem. Ten obiekt ma wbudowane zabezpieczenie przed przeciążeniem i kolejkowanie. Strumień do zapisu tworzysz, wywołując jego konstruktor: WritableStream(). Ma opcjonalny parametr underlyingSink, który reprezentuje obiekt z metodami i właściwościami definiującymi zachowanie utworzonej instancji strumienia.

underlyingSink

Element underlyingSink może zawierać te opcjonalne metody zdefiniowane przez dewelopera: Parametr controller przekazywany do niektórych metod jest zmienną typu WritableStreamDefaultController.

  • start(controller): ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Treść tej metody powinna umożliwiać uzyskanie dostępu do podstawowego odbiornika. Jeśli ten proces ma być wykonywany asynchronicznie, może zwrócić obietnicę, która sygnalizuje powodzenie lub niepowodzenie.
  • write(chunk, controller): ta metoda zostanie wywołana, gdy nowy fragment danych (określony w parametrze chunk) będzie gotowy do zapisania w podstawowym odbiorniku. Może zwrócić obietnicę sygnalizującą powodzenie lub niepowodzenie operacji zapisu. Ta metoda zostanie wywołana tylko po pomyślnym zapisaniu poprzednich danych, nigdy po zamknięciu lub przerwaniu strumienia.
  • close(controller): ta metoda zostanie wywołana, jeśli aplikacja sygnalizuje, że zakończyła zapisywanie fragmentów do strumienia. Treści powinny wykonać wszystkie niezbędne czynności, aby dokończyć zapisywanie do podstawowego odbiornika i udostępnić do niego dostęp. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Ta metoda zostanie wywołana dopiero po pomyślnym zapisaniu wszystkich danych z kolejki.
  • abort(reason): ta metoda zostanie wywołana, jeśli aplikacja sygnalizuje, że chce gwałtownie zamknąć strumień i ustawić go w stanie błędu. Może ona usunąć wszystkie zablokowane zasoby, podobnie jak close(), ale metoda abort() zostanie wywołana nawet wtedy, gdy operacje zapisu są w kolejce. Te fragmenty zostaną odrzucone. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Parametr reason zawiera wartość DOMString, która opisuje, dlaczego transmisja została przerwana.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Interfejs interfejsu Streams API WritableStreamDefaultController reprezentuje kontroler, który umożliwia kontrolowanie stanu WritableStream podczas konfigurowania, gdy przesyłane są kolejne fragmenty do zapisu lub po zakończeniu zapisu. Podczas tworzenia WritableStream do podstawowego odbiornika przekazywany jest odpowiedni obiekt WritableStreamDefaultController, którym można manipulować. WritableStreamDefaultController ma tylko jedną metodę: WritableStreamDefaultController.error(), co powoduje, że wszelkie przyszłe interakcje z powiązanym strumieniem powodują błąd. Obiekt WritableStreamDefaultController obsługuje też właściwość signal, która zwraca instancję AbortSignal, co umożliwia w razie potrzeby zatrzymanie operacji WritableStream.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Drugi, również opcjonalny, argument konstruktora WritableStream() to queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania dla strumienia. Przyjmuje on 2 parametry:

  • highWaterMark: nieujemna liczba wskazująca wysoki poziom ścieżki audio, która korzysta z tej strategii kolejkowania.
  • size(chunk): funkcja, która oblicza i zwraca skończony nieujemny rozmiar danej wartości fragmentu. Wynik służy do określania ciśnienia zwrotnego, które jest widoczne w odpowiedniej właściwości WritableStreamDefaultWriter.desiredSize.

Metody getWriter()write()

Aby zapisywać dane w strumieniach do zapisu, potrzebujesz elementu zapisującego, którym jest WritableStreamDefaultWriter. Metoda getWriter() interfejsu WritableStream zwraca nową instancję WritableStreamDefaultWriter i blokuje strumień w tej instancji. Podczas blokowania strumienia nie można użyć innego narzędzia do pisania, dopóki nie zostanie zwolniony bieżący strumień.

Metoda write() interfejsu WritableStreamDefaultWriter zapisuje przekazany fragment danych w obiekcie WritableStream i jego docelowym odbiorniku, a potem zwraca obietnicę, która wskazuje na powodzenie lub niepowodzenie operacji zapisu. Pamiętaj, że znaczenie terminu „sukces” zależy od podstawowego miejsca docelowego. Może on oznaczać, że fragment został zaakceptowany, ale niekoniecznie, że został bezpiecznie zapisany w miejscu docelowym.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Właściwość locked

Aby sprawdzić, czy strumień z możliwością zapisu jest zablokowany, otwórz jego właściwość WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Przykład kodu strumienia z możliwością zapisu

Przykładowy kod poniżej pokazuje wszystkie kroki.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Przesyłanie strumienia do odczytu do strumienia do zapisu

Strumień do odczytu można przekierować do strumienia do zapisu za pomocą metody pipeTo() strumienia do odczytu. ReadableStream.pipeTo() przekazuje bieżącą wartość ReadableStream do podanej wartości WritableStream i zwraca obietnicę, która jest spełniana, gdy proces przekazywania zakończy się pomyślnie, lub odrzucana, jeśli wystąpiły jakieś błędy.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Tworzenie strumienia przekształcenia

Interfejs TransformStream interfejsu Streams API reprezentuje zestaw danych, które można przekształcać. Strumień transformacji tworzysz, wywołując jego konstruktor TransformStream(), który tworzy i zwraca obiekt strumienia transformacji na podstawie podanych modułów obsługi. Konstruktor TransformStream() przyjmuje jako pierwszy argument opcjonalny obiekt JavaScript reprezentujący transformer. Takie obiekty mogą zawierać dowolną z tych metod:

transformer

  • start(controller): ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zwykle służy to do umieszczania w kole fragmentów prefiksu za pomocą controller.enqueue(). Te fragmenty będą odczytywane z czytalnej strony, ale nie będą zależeć od zapisów na stronie z możliwością zapisu. Jeśli ten początkowy proces jest asynchroniczny, na przykład dlatego, że wymaga pewnych działań, aby pobrać fragmenty prefiksu, funkcja może zwrócić obietnicę, aby zasygnalizować sukces lub niepowodzenie. Odrzucona obietnica spowoduje błąd strumienia. Konstruktor TransformStream() ponownie rzuca wszystkie wyjątki.
  • transform(chunk, controller): ta metoda jest wywoływana, gdy nowy fragment zapisany pierwotnie po stronie zapisu jest gotowy do przekształcenia. Implementacja strumienia gwarantuje, że ta funkcja będzie wywoływana tylko po pomyślnym wykonaniu poprzednich przekształceń i nigdy przed zakończeniem działania funkcji start() ani po wywołaniu funkcji flush(). Ta funkcja wykonuje właściwą pracę związaną z przekształcaniem danych w strumieniach przekształcania. Może kolejkować wyniki za pomocą funkcji controller.enqueue(). Dzięki temu pojedynczy fragment zapisany po stronie zapisu może spowodować, że po stronie odczytu nie będzie żadnych fragmentów lub będzie ich kilka, w zależności od tego, ile razy zostanie wywołana funkcja controller.enqueue(). Jeśli proces przetwarzania jest asynchroniczny, ta funkcja może zwrócić obietnicę, która sygnalizuje powodzenie lub niepowodzenie przetwarzania. Odrzucona obietnica spowoduje błąd zarówno po stronie odczytu, jak i zapisu strumienia transformacji. Jeśli nie podasz metody transform(), zostanie użyta transformacja tożsamości, która umieszcza w kole elementy bez zmian z strony z możliwością zapisu na stronie z możliwością odczytu.
  • flush(controller): ta metoda jest wywoływana, gdy wszystkie fragmenty zapisane po stronie z możliwością zapisu zostały przekształcone przez pomyślne przekazanie przez transform(), a strona z możliwością zapisu ma zostać zamknięta. Zwykle służy to do umieszczania w kolejce fragmentów sufiksów po stronie czytelniczej, zanim zostanie ona zamknięta. Jeśli proces czyszczenia jest asynchroniczny, funkcja może zwrócić obietnicę, aby zasygnalizować sukces lub niepowodzenie. Wynik zostanie przekazany do wywołującego funkcji stream.writable.write(). Odrzucona obietnica spowoduje błąd zarówno po stronie odczytu, jak i zapisu strumienia. Wyjątek jest traktowany tak samo jak odrzucenie obietnicy.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategie kolejkowania writableStrategyreadableStrategy

Drugi i trzeci opcjonalny parametr konstruktora TransformStream() to odpowiednio writableStrategyreadableStrategy. Są one zdefiniowane odpowiednio w sekcji dotyczącej strumieni do odczytudo zapisu.

Przykładowy kod przekształcania strumienia

Poniższy przykładowy kod pokazuje działanie prostego przekształcenia strumienia.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Przesyłanie strumienia czytelnego przez strumień transformacji

Metoda pipeThrough() interfejsu ReadableStream umożliwia łańcuchowe przesyłanie bieżącego strumienia przez strumień transformacji lub dowolną inną parę zapisu/odczytu. Przekierowanie strumienia zablokuje go na czas przekierowania, uniemożliwiając innym czytelnikom zablokowanie go.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Następujący przykładowy kod (trochę sztuczny) pokazuje, jak zaimplementować wersję fetch(), która powoduje pisanie wielkimi literami całego tekstu, wykorzystując zwróconą obietnicę odpowiedzi jako strumień i zamieniając na wielkie litery poszczególne fragmenty. Zaletą tego podejścia jest to, że nie musisz czekać na pobranie całego dokumentu, co może mieć ogromne znaczenie w przypadku dużych plików.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Prezentacja

Demo poniżej pokazuje działanie strumieni do odczytu, do zapisu i przekształcania. Zawiera też przykłady pipeThrough() i pipeTo() łańcuchów do rur oraz demonstruje tee(). Opcjonalnie możesz uruchomić demo w osobnym oknie lub wyświetlić kod źródłowy.

przydatne strumienie dostępne w przeglądarce;

W przeglądarce jest wiele przydatnych strumieni. Możesz łatwo utworzyć obiekt blob typu ReadableStream. Metoda stream() interfejsu Blob zwraca ReadableStream, który po odczycie zwraca dane zawarte w blobie. Pamiętaj też, że obiekt File jest określonym rodzajem obiektu Blob i może być używany w dowolnym kontekście, w którym można użyć obiektu blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Wersje strumieniowe plików TextDecoder.decode() i TextEncoder.encode() nazywają się odpowiednio TextDecoderStream i TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Kompresowanie i rozpakowywanie plików jest łatwe dzięki strumieniom transformacji CompressionStreamDecompressionStream. Przykładowy kod poniżej pokazuje, jak pobrać specyfikację strumieni, skompresować ją (gzip) bezpośrednio w przeglądarce i zapisać skompresowany plik bezpośrednio na dysku.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Interfejs File System Access API FileSystemWritableFileStream oraz eksperymentalne strumienie żądań fetch() to przykłady strumieni z możliwością zapisu w praktyce.

Interfejs Serial API intensywnie korzysta z obiektów strumieni do odczytu i zapisu.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Interfejs API WebSocketStream integruje strumienie z interfejsem WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Przydatne materiały

Podziękowania

Ten artykuł został sprawdzony przez Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley i Adam Rice. Posty na blogu Jake'a Archibalda pomogły mi dużo zrozumieć na temat strumieni. Niektóre przykłady kodu zostały zainspirowane eksperymentami użytkownika GitHuba @bellbind, a część tekstu opiera się na dokumentacji MDN na temat strumieni danych. Autorzy standardu Streams wykonali świetną pracę, tworząc te specyfikacje. Obraz główny udostępnił Ryan Lara na Unsplash.