Strumienie – kompleksowy przewodnik

Dowiedz się, jak używać strumieni czytelnych, zapisywalnych i przekształcanych za pomocą interfejsu Streams API.

Interfejs Streams API umożliwia programowy dostęp do strumieni danych otrzymanych przez sieć lub utworzonych lokalnie za pomocą dowolnych środków, a także ich przetwarzanie za pomocą kodu JavaScript. Strumieniowe przesyłanie polega na rozbiciu zasobu, który chcesz otrzymać, wysłać lub przekształcić, na małe fragmenty, a następnie przetwarzaniu tych fragmentów bit po bicie. Przesyłanie strumieniowe to coś, co przeglądarki robią zawsze, gdy otrzymują zasoby, takie jak HTML lub filmy, które mają być wyświetlane na stronach internetowych. Jednak ta funkcja nie była nigdy dostępna dla JavaScriptu przed wprowadzeniem w 2015 r. strumieni danych.fetch

Wcześniej, jeśli chciałeś przetworzyć jakiś zasób (np. plik wideo lub tekstowy), musiałeś pobrać cały plik, poczekać, aż zostanie on zdeserializowany do odpowiedniego formatu, a potem przetworzyć go. Dzięki strumieniom dostępnym w JavaScript wszystko się zmienia. Teraz możesz przetwarzać dane nieprzetworzone za pomocą JavaScriptu stopniowo, gdy tylko będą dostępne na kliencie, bez konieczności generowania bufora, ciągu znaków ani bloba. Umożliwia to wiele zastosowań, w tym te wymienione poniżej:

  • Efekty wideo: przesyłanie czytelnego strumienia wideo przez strumień transformacji, który stosuje efekty w czasie rzeczywistym.
  • Kompresja i dekompresja danych: przekierowanie strumienia plików przez strumień transformacji, który selektywnie je kompresuje lub dekompresuje.
  • Dekodowanie obrazu: przekierowanie strumienia odpowiedzi HTTP przez strumień transformacji, który dekoduje bajty na dane bitmapy, a następnie przez inny strumień transformacji, który przekształca bitmapy w pliki PNG. Jeśli jest zainstalowany w obiekcie fetch w ramach usługi, umożliwia to przejrzyste wypełnianie nowych formatów obrazów, takich jak AVIF.

Obsługa przeglądarek

ReadableStream i WritableStream

Obsługa przeglądarek

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Źródło

TransformStream

Obsługa przeglądarek

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Źródło

Podstawowe pojęcia

Zanim omówię szczegółowo różne typy strumieni, przedstawię kilka podstawowych pojęć.

kawałki

Kawałek to pojedynczy element danych zapisywany w strumieniu lub odczytywany z niego. Może to być dowolny typ; strumienie mogą zawierać fragmenty o różnych typach. W większości przypadków nie będzie to najbardziej elementarna jednostka danych w danym strumieniu. Na przykład strumień bajtów może zawierać fragmenty o wielkości 16 KiBUint8Array zamiast pojedynczych bajtów.

Czytelne strumienie

Czytalny strumień to źródło danych, z którego możesz odczytywać dane. Innymi słowy, dane wychodzą z czytelnego strumienia. Strumień tekstowy to konkretnie instancja klasy ReadableStream.

Strumienie z możliwością zapisu

Strumień z możliwością zapisu reprezentuje miejsce docelowe danych, do których możesz zapisywać dane. Inaczej mówiąc, dane przechodzą do strumienia do zapisu. Strumień do zapisu to instancja klasy WritableStream.

Przekształcanie strumieni

Strumień transformacji składa się z 2 strumieni: strumienia do zapisu (czyli strony zapisu) oraz strumienia do odczytu (czyli strony odczytu). W realnym świecie metaforą tego jest tłumacz symultaniczny, który na bieżąco tłumaczy z jednego języka na inny. W sposób specyficzny dla strumienia transformacji zapisywanie na stronie z możliwością zapisu powoduje, że nowe dane są dostępne do odczytu po stronie z możliwością odczytu. Konkretnie każdy obiekt z właściwością writablereadable może służyć jako strumień transformacji. Standardowa klasa TransformStream ułatwia jednak tworzenie par, które są prawidłowo splecione.

Łańcuchy do rur

Strumienie są głównie używane do przesyłania ich do siebie nawzajem. Strumień do odczytu można przekierować bezpośrednio do strumienia do zapisu, używając metody pipeTo() strumienia do odczytu. Można go też przekierować przez jeden lub więcej strumieni transformacji, używając metody pipeThrough() strumienia do odczytu. Zbiór połączonych strumieni nazywamy łańcuchem.

Ciśnienie wsteczne

Po utworzeniu łańcucha rur będzie on rozpowszechniać sygnały dotyczące tego, jak szybko elementy powinny przez niego przepływać. Jeśli którykolwiek z etapów w łańcuchu nie może jeszcze przyjąć fragmentów, rozprzestrzenia on sygnał w przeciwnym kierunku przez cały łańcuch, aż do momentu, gdy pierwotne źródło otrzyma informację o zaprzestaniu tak szybkiego generowania fragmentów. Ten proces normalizacji przepływu nazywa się odwrotnym ciśnieniem.

Teeing

Przepływ danych do odczytu może być rozdzielany (nazwa pochodzi od kształtu litery „T” w wielkiej literze) za pomocą metody tee(). Spowoduje to zablokowanie strumienia, co oznacza, że nie będzie można go używać bezpośrednio. Spowoduje to jednak utworzenie 2 nowych strumieni, zwanych gałęziami, które można odtwarzać niezależnie. Jest to ważne, ponieważ strumieni nie można cofnąć ani ponownie uruchomić. Więcej informacji na ten temat znajdziesz poniżej.

Schemat łańcucha przekierowań składającego się z strumienia danych do odczytu pochodzącego z wywołania interfejsu fetch API, który jest następnie przekierowywany przez strumień danych do przekształcania, którego dane wyjściowe są rozdzielane i wysyłane do przeglądarki (dla pierwszego strumienia danych do odczytu) oraz do pamięci podręcznej service workera (dla drugiego strumienia danych do odczytu).
Sieć.

Mechanika czytelnego strumienia

Czytelny strumień to źródło danych reprezentowane w JavaScriptie przez obiekt ReadableStream, który pochodzi z podstawowego źródła. Konstruktor ReadableStream()tworzy i zwraca obiekt strumienia do odczytu na podstawie podanych modułów obsługi. Istnieją 2 rodzaje źródeł danych:

  • Źródła push stale przesyłają dane po uzyskaniu dostępu. To od Ciebie zależy, czy chcesz rozpocząć, wstrzymać lub anulować dostęp do strumienia. Przykłady to strumienie wideo na żywo, zdarzenia wysyłane przez serwer lub WebSockets.
  • Źródła typu „pull” wymagają, aby po nawiązaniu połączenia z nimi wysłać do nich wyraźne żądanie danych. Przykłady obejmują operacje HTTP za pomocą wywołań fetch() lub XMLHttpRequest.

Dane strumienia są odczytywane sekwencyjnie w mniejszych częściach zwanych fragmentami. Fragmenty umieszczone w strumieniu są dodawane do kolejki. Oznacza to, że są w kolejce i czekają na odczytanie. Kolejka wewnętrzna śledzi fragmenty, które nie zostały jeszcze przeczytane.

Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować odwrotny nacisk na podstawie stanu kolejki wewnętrznej. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce ze wskazaną liczbą, zwaną wysoką wartością graniczną.

Fragmenty w strumieniu są odczytywane przez czytelnika. Ten czytnik pobiera dane po kawałku, co pozwala na wykonywanie na nich dowolnych operacji. Czytnik wraz z innym kodem przetwarzania, który jest z nim powiązany, jest nazywany konsumentem.

Kolejna konstrukcja w tym kontekście nazywa się kontroler. Każdy czytelny strumień ma powiązany kontroler, który, jak sama nazwa wskazuje, umożliwia sterowanie strumieniem.

Tylko jeden czytnik może odczytywać strumień naraz. Gdy czytnik zostanie utworzony i zacznie odczytywać strumień (czyli stanie się aktywnym czytnikiem), zostaje zablokowany na nim. Jeśli chcesz, aby inna osoba przejęła czytanie strumienia, musisz najpierw zwolnić pierwszego czytelnika (chociaż możesz też rozgałęzić strumień).

Tworzenie czytelnego strumienia

Aby utworzyć strumień do odczytu, wywołaj jego konstruktor: ReadableStream(). Konstruktor ma opcjonalny argument underlyingSource, który reprezentuje obiekt z metodami i właściwościami definiującymi sposób działania utworzonej instancji strumienia.

underlyingSource

Możesz użyć tych opcjonalnych metod zdefiniowanych przez dewelopera:

  • start(controller): jest wywoływana natychmiast po utworzeniu obiektu. Metoda może uzyskiwać dostęp do źródła strumienia i wykonywać inne czynności wymagane do skonfigurowania funkcji strumienia. Jeśli ten proces ma być wykonywany asynchronicznie, metoda może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Parametr controller przekazany do tej metody to a ReadableStreamDefaultController.
  • pull(controller): można go użyć do sterowania strumieniem, gdy pobierane są kolejne fragmenty. Jest ona wywoływana wielokrotnie, dopóki wewnętrzna kolejka fragmentów strumienia nie jest pełna, aż do osiągnięcia maksymalnego poziomu. Jeśli wywołanie funkcji pull() spowoduje obietnicę, funkcja pull() nie zostanie wywołana ponownie, dopóki nie zostanie spełniona obietnica. Jeśli obietnica zostanie odrzucona, strumień będzie zawierał błąd.
  • cancel(reason): jest wywoływany, gdy użytkownik strumienia anuluje strumień.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController obsługuje te metody:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Drugi, również opcjonalny, argument konstruktora ReadableStream() to queuingStrategy. Jest to obiekt, który opcjonalnie definiuje kolejkowanie dla strumienia. Przyjmuje 2 parametry:

  • highWaterMark: nieujemna liczba wskazująca wysoki poziom okna transmisji używającej tej strategii kolejkowania.
  • size(chunk): funkcja, która oblicza i zwraca skończony nieujemny rozmiar danej wartości fragmentu. Wynik służy do określenia ciśnienia zwrotnego, które jest widoczne w odpowiedniej właściwości ReadableStreamDefaultController.desiredSize. Określa ona też, kiedy wywoływana jest metoda pull() źródła danych.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

metody getReader()read();

Aby odczytać strumień do odczytu, potrzebujesz czytnika, którym będzie ReadableStreamDefaultReader. Metoda getReader() interfejsu ReadableStream tworzy czytnik i blokuje strumień. Podczas blokowania strumienia nie można uzyskać dostępu do żadnego innego czytnika, dopóki ten nie zostanie zwolniony.

Metoda read() interfejsu ReadableStreamDefaultReader zwraca obietnicę zapewniającą dostęp do następnego fragmentu w wewnętrznej kolejce strumienia. W zależności od stanu strumienia może ona spełnić lub odrzucić dane. Możliwości są następujące:

  • Jeśli fragment jest dostępny, obietnica zostanie spełniona za pomocą obiektu w formie
    { value: chunk, done: false }.
  • Jeśli strumień zostanie zamknięty, obietnica zostanie spełniona za pomocą obiektu w formie
    { value: undefined, done: true }.
  • Jeśli strumień będzie zawierał błąd, obietnica zostanie odrzucona z odpowiednim komunikatem o błędzie.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Właściwość locked

Aby sprawdzić, czy strumień do odczytu jest zablokowany, otwórz obiekt ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

czytelne przykłady kodu strumienia danych.

Przykładowy kod poniżej pokazuje wszystkie te kroki w akcji. Najpierw tworzysz klasę ReadableStream, która w swoim argumencie underlyingSource (czyli klasie TimestampSource) definiuje metodę start(). Ta metoda informuje controller strumienia, aby wysyłaćenqueue() sygnaturę czasową co sekundę przez 10 sekund. Na koniec przekazuje kontrolerowi polecenie close() strumienia. Odczytujesz ten strumień, tworząc czytnik za pomocą metody getReader() i wywołując read(), aż strumień będzie done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iteracja asynchroniczna

Sprawdzanie w każdej iteracji pętli read(), czy strumień jest done, może nie być najwygodniejszym interfejsem API. Na szczęście wkrótce pojawi się lepszy sposób: iteracja asynchroniczna.

for await (const chunk of stream) {
  console.log(chunk);
}

Aby obecnie korzystać z iteracji asynchronicznej, możesz zaimplementować to zachowanie za pomocą polyfilla.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Odczytywanie strumienia danych

Metoda tee() interfejsu ReadableStream rozdziela bieżący odczytywalny strumień, zwracając tablicę o 2 elementach zawierającą 2 wynikowe gałęzie jako nowe instancje ReadableStream. Dzięki temu 2 czytelnicy mogą jednocześnie czytać strumień. Możesz to zrobić na przykład w skrypcie service worker, jeśli chcesz pobrać odpowiedź z serwera i przesłać ją do przeglądarki, a także do pamięci podręcznej skryptu service worker. Ponieważ treści odpowiedzi nie można wykorzystać więcej niż raz, musisz utworzyć 2 kopie. Aby anulować strumień, musisz anulować oba powstałe rozgałęzienia. Przekierowanie strumienia zablokuje go na czas trwania, uniemożliwiając innym użytkownikom zablokowanie go.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

czytelne strumienie bajtów;

W przypadku strumieni reprezentujących bajty udostępniana jest rozszerzona wersja czytelnego strumienia, która umożliwia efektywne przetwarzanie bajtów, w szczególności poprzez minimalizowanie kopii. Strumienie bajtów umożliwiają nabywanie czytników typu „przynieś własną pamięć podręczną” (BYOB). Domyślna implementacja może dawać różne dane wyjściowe, takie jak ciągi znaków lub tablice buforów w przypadku WebSockets, podczas gdy strumienie bajtów gwarantują dane wyjściowe w postaci bajtów. Ponadto czytelnicy mogą korzystać z dodatkowych funkcji stabilności. Jeśli bufor się odłączy, może to spowodować, że nie zapiszesz dwa razy do tego samego bufora, co zapobiega sytuacjom wyścigu. Czytniki BYOB mogą zmniejszyć liczbę razy, ile przeglądarka musi wykonać zbiórka, ponieważ może ponownie używać buforów.

Tworzenie czytelnego strumienia bajtów

Aby utworzyć czytelny strumień bajtów, możesz przekazać dodatkowy parametr type do konstruktora ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Źródło strumienia bajtów do odczytu ma uprawnienia ReadableByteStreamController do manipulowania. Metoda ReadableByteStreamController.enqueue() przyjmuje argument chunk, którego wartość jest typu ArrayBufferView. Właściwość ReadableByteStreamController.byobRequest zwraca bieżącą prośbę o przechwycenie z repozytorium BYOB lub wartość null, jeśli nie ma takiej prośby. Wreszcie właściwość ReadableByteStreamController.desiredSize zwraca pożądany rozmiar, aby wypełnić kolejkę wewnętrzną kontrolowanego strumienia.

queuingStrategy

Drugi, również opcjonalny, argument konstruktora ReadableStream() to queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia. Przyjmuje on jeden parametr:

  • highWaterMark: nieujemna liczba bajtów wskazująca wysoki poziom strumienia przy użyciu tej strategii kolejkowania. Służy to określeniu ciśnienia wstecznego, które jest widoczne w odpowiedniej właściwości ReadableByteStreamController.desiredSize. Określa ona też, kiedy wywoływana jest metoda pull() źródła danych.

metody getReader()read();

Następnie możesz uzyskać dostęp do ReadableStreamBYOBReader, odpowiednio ustawiając parametr mode: ReadableStream.getReader({ mode: "byob" }). Umożliwia to dokładniejsze kontrolowanie przydziału bufora, aby uniknąć kopiowania. Aby odczytać dane z potoku bajtów, musisz wywołać funkcję ReadableStreamBYOBReader.read(view), gdzie view to ArrayBufferView.

Przykładowy kod odczytujalnego strumienia bajtów

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Podana niżej funkcja zwraca czytelne strumienie bajtów, które umożliwiają wydajne odczytywanie tablicy wygenerowanej losowo bez kopiowania. Zamiast używać z góry określonego rozmiaru fragmentu (1024), próbuje wypełnić bufor dostarczony przez dewelopera, co zapewnia pełną kontrolę.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mechanika strumienia z możliwością zapisu

Możliwość zapisu w strumieniu to miejsce docelowe, do którego możesz zapisywać dane reprezentowane w JavaScriptzie przez obiekt WritableStream. Jest to abstrakcja na szczycie podstawy odbiornika, czyli odbiornika we/wy na niższym poziomie, do którego zapisywane są dane nieprzetworzone.

Dane są zapisywane w strumieniach za pomocą programu do zapisywania, po jednym kawałku naraz. Kawałek może przybierać wiele form, tak jak kawałki w czytniku. Do wygenerowania fragmentów gotowych do zapisu możesz użyć dowolnego kodu. Takie połączenie skryptu i powiązanego kodu nazywa się producentem.

Gdy nowy pisarz zaczyna pisać do strumienia (aktywny pisarz), mówi się, że jest zablokowany. Do strumienia do zapisu może pisać tylko 1 program. Jeśli chcesz, aby inny pisarz zaczął pisać do Twojego strumienia, musisz go najpierw uwolnić, a potem dołączyć do niego innego pisarza.

Kolejka wewnętrzna śledzi fragmenty zapisane w strumieniach, które nie zostały jeszcze przetworzone przez docelowy odbiornik.

Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować odwrotny nacisk na podstawie stanu kolejki wewnętrznej. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce ze wskazaną liczbą, zwaną wysoką wartością graniczną.

Ostatnia konstrukcja jest nazywana kontrolerem. Każdy strumień z możliwością zapisu ma powiązany kontroler, który umożliwia sterowanie tym strumieniem (np. jego przerwanie).

Tworzenie strumienia do zapisu

Interfejs WritableStream interfejsu Streams API zapewnia standardową abstrakcję do zapisywania danych strumieniowych w miejscu docelowym, zwanym odbiornikiem. Ten obiekt ma wbudowane zabezpieczenie przed przeciążeniem i kolejkowanie. Strumień do zapisu tworzysz, wywołując jego konstruktor: WritableStream(). Ma on opcjonalny parametr underlyingSink, który reprezentuje obiekt z metodami i właściwościami definiującymi sposób działania utworzonej instancji strumienia.

underlyingSink

underlyingSink może zawierać te opcjonalne metody zdefiniowane przez dewelopera. Parametr controller przekazywany do niektórych metod jest zmienną typu WritableStreamDefaultController.

  • start(controller): ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zawartość tej metody powinna umożliwiać uzyskanie dostępu do podstawowego odbiornika. Jeśli ten proces ma być wykonywany asynchronicznie, może zwrócić obietnicę, która sygnalizuje powodzenie lub niepowodzenie.
  • write(chunk, controller): ta metoda zostanie wywołana, gdy nowy fragment danych (określony w parametrze chunk) będzie gotowy do zapisania w podstawowym odbiorniku. Może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie operacji zapisu. Ta metoda zostanie wywołana tylko po pomyślnym zapisaniu poprzednich danych, nigdy po zamknięciu lub przerwaniu strumienia.
  • close(controller): ta metoda zostanie wywołana, jeśli aplikacja sygnalizuje, że zakończyła zapisywanie fragmentów do strumienia. Treści powinny zrobić wszystko, co jest konieczne, aby dokończyć zapisywanie do podstawowego odbiornika i udostępnić do niego dostęp. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Ta metoda zostanie wywołana dopiero po pomyślnym zapisaniu wszystkich danych z kolejki.
  • abort(reason): ta metoda zostanie wywołana, jeśli aplikacja sygnalizuje, że chce gwałtownie zamknąć strumień i ustawić go w stanie błędu. Może ona wyczyścić wszystkie zablokowane zasoby, podobnie jak close(), ale metoda abort() zostanie wywołana nawet wtedy, gdy operacje zapisu są w kolejce. Te fragmenty zostaną odrzucone. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, która sygnalizuje powodzenie lub niepowodzenie. Parametr reason zawiera wartość DOMString, która opisuje, dlaczego transmisja została przerwana.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Interfejs interfejsu Streams API WritableStreamDefaultController reprezentuje kontroler, który umożliwia kontrolowanie stanu WritableStream podczas konfigurowania, gdy przesyłane są kolejne fragmenty do zapisu lub po zakończeniu zapisu. Podczas tworzenia WritableStream do podstawowego odbiornika przekazywany jest odpowiedni obiekt WritableStreamDefaultController, którym można manipulować. WritableStreamDefaultController ma tylko jedną metodę: WritableStreamDefaultController.error(), co powoduje, że wszelkie przyszłe interakcje z powiązanym strumieniem powodują błąd. Obiekt WritableStreamDefaultController obsługuje też właściwość signal, która zwraca instancję AbortSignal, co umożliwia zatrzymanie operacji WritableStream w razie potrzeby.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Drugi, również opcjonalny, argument konstruktora WritableStream() to queuingStrategy. Jest to obiekt, który opcjonalnie definiuje kolejkowanie dla strumienia. Przyjmuje 2 parametry:

  • highWaterMark: nieujemna liczba wskazująca wysoki poziom okna transmisji używającej tej strategii kolejkowania.
  • size(chunk): funkcja, która oblicza i zwraca skończony nieujemny rozmiar danej wartości fragmentu. Wynik służy do określenia ciśnienia zwrotnego, które jest widoczne w odpowiedniej właściwości WritableStreamDefaultWriter.desiredSize.

metody getWriter()write();

Aby zapisywać dane w strumieniach do zapisu, musisz mieć element zapisu, którym jest WritableStreamDefaultWriter. Metoda getWriter() interfejsu WritableStream zwraca nową instancję WritableStreamDefaultWriter i blokuje strumień w tej instancji. Podczas blokowania strumienia nie można dodać żadnego innego pisarza, dopóki nie zostanie zwolniony bieżący.

Metoda write() interfejsu WritableStreamDefaultWriter zapisuje przekazany fragment danych w obiekcie WritableStream i jego docelowym odbiorniku, a następnie zwraca obietnicę, która wskazuje na powodzenie lub niepowodzenie operacji zapisu. Pamiętaj, że znaczenie terminu „sukces” zależy od docelowego miejsca docelowego. Może on oznaczać, że fragment został zaakceptowany, ale niekoniecznie, że został bezpiecznie zapisany w docelowym miejscu docelowym.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Właściwość locked

Aby sprawdzić, czy strumień z możliwością zapisu jest zablokowany, otwórz jego właściwość WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Przykład kodu strumienia z możliwością zapisu

Przykładowy kod poniżej pokazuje wszystkie kroki.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Przesyłanie strumienia do odczytu do strumienia do zapisu

Strumień do odczytu można przekierować do strumienia do zapisu za pomocą metody pipeTo() strumienia do odczytu. ReadableStream.pipeTo() przekazuje bieżącą wartość ReadableStream do podanej wartości WritableStream i zwraca obietnicę, która jest wypełniana po pomyślnym zakończeniu procesu przekazywania lub odrzucana, jeśli wystąpiły błędy.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Tworzenie strumienia przekształcenia

Interfejs TransformStream interfejsu Streams API reprezentuje zestaw danych, które można przekształcać. Strumień transformacji tworzysz, wywołując jego konstruktor TransformStream(), który tworzy i zwraca obiekt strumienia transformacji na podstawie podanych modułów obsługi. Konstruktor TransformStream() przyjmuje jako pierwszy argument opcjonalny obiekt JavaScript reprezentujący transformer. Takie obiekty mogą zawierać dowolną z tych metod:

transformer

  • start(controller): ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zwykle służy to do umieszczania w kole fragmentów prefiksu za pomocą parametru controller.enqueue(). Te fragmenty będą odczytywane z czytalnej strony, ale nie będą zależeć od zapisów na stronie z możliwością zapisu. Jeśli ten początkowy proces jest asynchroniczny, na przykład dlatego, że wymaga pewnych działań, aby pobrać fragmenty prefiksu, funkcja może zwrócić obietnicę, aby zasygnalizować sukces lub niepowodzenie. Odrzucona obietnica spowoduje błąd strumienia. Konstruktor TransformStream() ponownie rzuca wszystkie wyjątki.
  • transform(chunk, controller): ta metoda jest wywoływana, gdy nowy fragment zapisany pierwotnie po stronie zapisu jest gotowy do przekształcenia. Implementacja strumienia gwarantuje, że ta funkcja będzie wywoływana tylko po pomyślnym wykonaniu poprzednich przekształceń i nigdy przed zakończeniem działania funkcji start() ani po wywołaniu funkcji flush(). Ta funkcja wykonuje właściwą pracę związaną z przekształcaniem danych w strumieniach przekształcania. Może ona umieszczać wyniki w kolejce za pomocą funkcji controller.enqueue(). Dzięki temu pojedynczy fragment zapisany po stronie zapisu może spowodować, że po stronie odczytu nie będzie żadnych fragmentów lub będzie ich kilka, w zależności od tego, ile razy wywołano funkcję controller.enqueue(). Jeśli proces przetwarzania jest asynchroniczny, ta funkcja może zwrócić obietnicę, która sygnalizuje powodzenie lub niepowodzenie przetwarzania. Odrzucona obietnica spowoduje błąd zarówno po stronie odczytu, jak i zapisu strumienia transformacji. Jeśli nie podasz metody transform(), zostanie użyta transformacja tożsamości, która umieszcza w kolejce niezmienione fragmenty z zapisywalnej strony na stronę czytelną.
  • flush(controller): ta metoda jest wywoływana po tym, jak wszystkie fragmenty zapisane po stronie z możliwością zapisu zostały przekształcone przez udane przekazanie przez transform(), a strona z możliwością zapisu ma zostać zamknięta. Zwykle służy to do umieszczania w kolejce fragmentów sufiksów po stronie czytelniczej, zanim zostanie ona zamknięta. Jeśli proces czyszczenia jest asynchroniczny, funkcja może zwrócić obietnicę, aby zasygnalizować sukces lub niepowodzenie. Wynik zostanie przekazany do wywołującego funkcji stream.writable.write(). Odrzucona obietnica spowoduje błąd zarówno po stronie odczytu, jak i zapisu strumienia. Wyjątek jest traktowany tak samo jak odrzucenie obietnicy.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategie kolejkowania writableStrategyreadableStrategy

Drugi i trzeci opcjonalny parametr konstruktora TransformStream() to odpowiednio writableStrategyreadableStrategy. Są one zdefiniowane odpowiednio w sekcji dotyczącej strumieni do odczytudo zapisu.

Przykładowy kod przekształcania strumienia

Poniższy przykładowy kod pokazuje działanie prostego przekształcenia strumienia.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Przesyłanie strumienia do odczytu przez strumień do transformacji

Metoda pipeThrough() interfejsu ReadableStream umożliwia łańcuchowe przesyłanie bieżącego strumienia przez strumień przekształcenia lub dowolną inną parę zapisu/odczytu. Przekierowanie strumienia zablokuje go na czas przekierowania, uniemożliwiając innym czytelnikom zablokowanie go.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Następujący przykładowy kod (trochę sztuczny) pokazuje, jak zastosować wersję fetch(), która powoduje pisanie wszystkich znaków tekstu wielką literą, wykorzystując zwróconą obietnicę odpowiedzi jako strumień i zamieniając znaki na wielkie litery po kawałku. Zaletą tego podejścia jest to, że nie musisz czekać na pobranie całego dokumentu, co może mieć ogromne znaczenie w przypadku dużych plików.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Prezentacja

Demonstracja poniżej pokazuje działanie strumieni czytalnych, zapisywalnych i przekształcających. Zawiera też przykłady pipeThrough() i pipeTo() łańcuchów przewodów oraz demonstruje tee(). Opcjonalnie możesz uruchomić demo w osobnym oknie lub wyświetlić kod źródłowy.

Przydatne strumienie dostępne w przeglądarce

W przeglądarce jest wiele przydatnych strumieni danych. Możesz łatwo utworzyć ReadableStream z bloba. Metoda stream() interfejsu Blob zwraca ReadableStream, który po odczycie zwraca dane zawarte w pliku blob. Pamiętaj też, że obiekt File to specyficzny rodzaj obiektu Blob, który można używać w dowolnym kontekście, w którym można używać obiektów blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Wersje strumieniowe TextDecoder.decode() i TextEncoder.encode() nazywają się odpowiednio TextDecoderStream i TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Kompresowanie i rozpakowywanie plików jest proste dzięki strumieniom transformacji CompressionStreamDecompressionStream. Przykładowy kod poniżej pokazuje, jak pobrać specyfikację strumieni, skompresować ją (gzip) bezpośrednio w przeglądarce i zapisać skompresowany plik bezpośrednio na dysku.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Interfejs File System Access API FileSystemWritableFileStream i eksperymentalne strumienie żądań fetch() to przykłady strumieni z możliwością zapisu w praktyce.

Interfejs Serial API intensywnie korzysta zarówno ze strumieni do odczytu, jak i do zapisu.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

I wreszcie interfejs API WebSocketStream integruje strumienie z interfejsem WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Przydatne materiały

Podziękowania

Ten artykuł został sprawdzony przez Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley i Adam Rice. Artykuły w blogu Jake'a Archibalda bardzo mi pomogły w zrozumieniu strumieni. Niektóre przykłady kodu zostały zainspirowane eksperymentami użytkownika GitHuba @bellbind, a część tekstu opiera się na dokumentacji MDN na temat strumieni danych. Autorzy standardu Streams Standard wykonali świetną pracę, tworząc te specyfikacje. Obraz główny udostępnił Ryan Lara na Unsplash.