Strumienie – kompleksowy przewodnik

Dowiedz się, jak za pomocą interfejsu Streams API używać czytelnych i możliwych do zapisu strumieni oraz strumieni przekształcania.

Interfejs Streams API umożliwia automatyczny dostęp do strumieni danych otrzymanych przez sieć lub tworzone w inny sposób lokalnie, i przetwarzać je za pomocą JavaScriptu. Strumieniowanie polega na rozbiciu zasobu, który chcesz odebrać, wysłać lub przekształcić na małe kawałki, a następnie przetwarzamy je krok po kroku. Strumieniowe przesyłanie danych przeglądarki i tak otrzymują zasoby, takie jak HTML czy filmy, które mają być wyświetlane na stronach internetowych, Usługa fetch nie była dostępna w języku JavaScript przed wprowadzeniem strumieni w 2015 roku.

Wcześniej, aby przetworzyć określony zasób (np. film lub plik tekstowy), musisz pobrać cały plik, poczekać na deserializację do odpowiedniego formatu a potem przetwarzać. Transmisje są dostępne dla: JavaScript, to wszystko się zmienia. Możesz teraz przetwarzać nieprzetworzone dane za pomocą JavaScriptu stopniowo w miarę gdy tylko będzie dostępny u klienta, bez konieczności generowania bufora, ciągu znaków czy obiektu blob. Odblokowuje to szereg przypadków użycia. Niektóre z nich wymieniam poniżej:

  • Efekty wideo: przytaczanie czytelnego strumienia wideo przez strumień przekształcenia, który stosuje efekty. w czasie rzeczywistym.
  • (de)kompresja danych: przytaczanie strumienia plików przez strumień przekształcenia, który wybiórczo (de) kompresuje go.
  • Dekodowanie obrazów: potok odpowiedzi HTTP przez strumień przekształcenia, który dekoduje bajty. do formatu bitmapy, a potem przez inny strumień przekształcenia, który przekształca mapy bitowe na pliki PNG. Jeśli zainstalowane w module obsługi fetch skryptu service worker, umożliwia to przejrzystość kodu polyfill takich jak AVIF.

Obsługa przeglądarek

ReadableStream i WritableStream

Obsługa przeglądarek

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1

Źródło

TransformStream

Obsługa przeglądarek

  • Chrome: 67.
  • Krawędź: 79.
  • Firefox: 102.
  • Safari: 14.1

Źródło

Podstawowe pojęcia

Zanim przejdziemy do szczegółowych informacji o różnych typach transmisji, przedstawię kilka podstawowych pojęć.

Kawałki

Fragment to pojedynczy fragment danych zapisywany w strumieniu lub z niego odczytywany. Może to być dowolny type; strumienie mogą zawierać nawet fragmenty różnych typów. Przeważnie fragment nie będzie najbardziej atomowy, dla danego strumienia. Na przykład strumień bajtów może zawierać fragmenty składające się z 16 KiB jednostek Uint8Array zamiast pojedynczych bajtów.

Strumienie do odczytu

Czytelny strumień reprezentuje źródło danych, z którego można odczytywać dane. Inaczej mówiąc, dane mogą w czytelny strumień. Czytelny strumień to instancja instancji ReadableStream zajęcia.

Strumienie z możliwością zapisu

Strumień z możliwością zapisu reprezentuje miejsce docelowe danych, w których możesz je zapisywać. Inaczej mówiąc, dane przekazuje się do strumienia możliwego do zapisu. Strumień możliwy do zapisu to przykład WritableStream zajęcia.

Strumienie przekształcania

Strumień przekształcenia składa się z pary strumieni: strumienia, który można zapisywać, nazywanego stroną z możliwością zapisu. i czytelna strona. Prawdziwą metaforą byłaby tłumacz symultaniczny który na bieżąco tłumaczy z jednego języka na inny. W sposób charakterystyczny dla strumienia przekształcania, powoduje udostępnienie nowych danych do odczytu czytelna strona. Oznacza to, że każdy obiekt z właściwością writable i readable może wyświetlać reklamy jako strumień przekształcenia. Standardowa klasa TransformStream ułatwia jednak tworzenie która jest odpowiednio zaplątana.

Łańcuchy potoków

Strumienie są używane przede wszystkim przez połączenie ich ze sobą. Czytelny strumień można dodać bezpośrednio do potoku do strumienia z możliwością zapisu, używając metody pipeTo() zrozumiałego strumienia. Można też przekazać go przy użyciu lub więcej strumieni przekształcenia, korzystając z metody pipeThrough() zrozumiałego strumienia. Zestaw strumienie połączone potokiem w ten sposób nazywamy łańcuchem rur.

Ciśnienie wsteczne

Po utworzeniu łańcucha potoków zostaną przesłane sygnały dotyczące szybkości przepływu fragmentów i przechodzić przez niego. Jeśli dowolny krok w łańcuchu nie może jeszcze przyjmować fragmentów, przesyła sygnał wstecz w łańcuchu rur, aż w końcu ukaże się, że pierwotne źródło nie generuje już fragmentów, szybko. Ten proces normalizowania przepływu jest nazywany obciążeniem wstecznym.

Rozgrywka

Czytelny strumień można powiązać (nazwając go zgodnie z kształtem wielkiej litery „T”) przy użyciu metody tee(). Spowoduje to zablokowanie strumienia, czyli nie będzie już można z niej korzystać bezpośrednio. ale utworzy 2 nowe strumienie, nazywane gałęziami, które mogą być wykorzystywane niezależnie. Odtwarzanie jest też ważne, ponieważ transmisji nie można przewinąć ani ponownie uruchomić. Więcej informacji na ten temat znajdziesz później.

Schemat łańcucha potoku zawierającego czytelny strumień pochodzący z wywołania do interfejsu API pobierania, który jest następnie przekazywany potokiem przez strumień przekształcenia, którego dane wyjściowe są połączone, a następnie wysyłane do przeglądarki w celu pierwszego gotowego, czytelnego strumienia i do pamięci podręcznej skryptu service worker dla drugiego takiego strumienia, który można odczytać.
Łańcuch rur.

Mechanika czytelnego strumienia

Czytelny strumień to źródło danych reprezentowane w JavaScripcie przez tag ReadableStream obiektem, który i spływają z podstawowego źródła. ReadableStream() konstruktor tworzy i zwraca czytelny obiekt strumienia z podanych modułów obsługi. Dostępne są 2 typy bazowego źródła:

  • Źródła push nieustannie przesyłają Ci dane, gdy masz do nich dostęp. Ty decydujesz, rozpocząć, wstrzymać lub anulować dostęp do transmisji. Mogą to być na przykład transmisje wideo na żywo, zdarzenia wysłane przez serwer, czyli WebSockets.
  • Pobieranie źródeł wymaga jawnego żądania danych z tych źródeł po połączeniu z nimi. Przykłady uwzględnia operacje HTTP za pomocą wywołań funkcji fetch() lub XMLHttpRequest.

Strumieniowe dane są odczytywane sekwencyjnie w postaci małych fragmentów. Fragmenty umieszczone w strumieniu są umieszczone w kolejce. To oznacza, że czekają w kolejce gotowe do przeczytania. Wewnętrzna kolejka przechowuje fragmenty, które nie zostały jeszcze przeczytane.

Strategia kolejkowania to obiekt określający sposób, w jaki strumień powinien sygnalizować obciążenie wsteczne na podstawie o stanie jej wewnętrznej kolejki. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce do określonej liczby, czyli do wysokiego znaku wodnego.

Fragmenty wewnątrz strumienia są odczytywane przez czytelnika. Ten czytnik pobiera dane po jednym fragmencie co pozwala wykonać dowolne operacje. Czytelnik plus drugi wraz z nim kod przetwarzania jest nazywany konsumentem.

Następny konstrukt w tym kontekście to kontroler. Każdy możliwy do odczytania strumień ma powiązane który, jak sama nazwa wskazuje, umożliwia sterowanie strumieniem.

Tylko 1 czytelnik może w danym momencie odczytać strumień. gdy użytkownik zostanie utworzony i zacznie czytać strumień. (czyli staje się aktywnym czytnikiem), jest na nim zablokowany. Jeśli chcesz, aby inny czytelnik przejął kontrolę gdy czytasz strumień, musisz zwykle zwolnić pierwszego czytelnika, zanim wykonasz inne czynności. (ale możesz oglądać transmisje).

Tworzenie czytelnego strumienia

Generując czytelny strumień, wywołując jego konstruktor ReadableStream() Konstruktor ma opcjonalny argument underlyingSource, który reprezentuje obiekt. z metodami i właściwościami, które określają, jak będzie zachowywać się utworzona instancja strumienia.

underlyingSource

Mogą to być następujące opcjonalne metody zdefiniowane przez programistę:

  • start(controller): wywoływana natychmiast po utworzeniu obiektu. metoda może uzyskać dostęp do źródła strumienia i wykonać inne czynności wymagane do skonfigurowania funkcji strumieniowania. Jeśli ten proces ma zostać wykonany asynchronicznie, metoda może zwracają obietnicę sukcesu lub porażki. Parametr controller przekazywany do tej metody to w ReadableStreamDefaultController
  • pull(controller): może służyć do sterowania strumieniem podczas pobierania kolejnych fragmentów. it jest wywoływana wielokrotnie, dopóki wewnętrzna kolejka fragmentów strumienia nie będzie pełna, aż do momentu osiąga wysoki znak wodny. Jeśli wynik wywołania funkcji pull() jest obiecujący, Usługa pull() nie zostanie wywołana ponownie, dopóki ta obietnica nie zostanie zrealizowana. Jeśli obietnica zostanie odrzucona, podczas transmisji wystąpi błąd.
  • cancel(reason): wywołanie, gdy konsument anuluje transmisję.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController obsługuje te metody:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Drugim, podobnie opcjonalnym argumentem konstruktora ReadableStream(), jest queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia, która składa się z dwóch parametry:

  • highWaterMark: liczba nieujemna wskazująca na wysoki znak wodny strumienia przy użyciu tej strategii kolejkowania.
  • size(chunk): funkcja, która oblicza i zwraca skończony, nieujemny rozmiar danej wartości fragmentu. Wynik służy do określania naprężenia wstecznego, które pojawia się za pomocą odpowiedniej właściwości ReadableStreamDefaultController.desiredSize. Określa też to, kiedy wywoływana jest metoda pull() źródła.
.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Metody getReader() i read()

Aby czytać z czytelnego strumienia, potrzebujesz czytnika, który będzie ReadableStreamDefaultReader Metoda getReader() interfejsu ReadableStream tworzy czytnik i blokuje strumień . Gdy strumień jest zablokowany, nie można pozyskać żadnego innego czytnika, dopóki nie zostanie on zwolniony.

read() metoda interfejsu ReadableStreamDefaultReader zwraca obietnicę dającą dostęp do kolejnej w wewnętrznej kolejce strumienia. Wypełnia lub odrzuca z wynikiem w zależności od stanu w transmisji na żywo. Istnieją następujące możliwości:

  • Jeśli fragment jest dostępny, obietnica zostanie zrealizowana za pomocą obiektu formularza
    { value: chunk, done: false }
  • Jeśli strumień zostanie zamknięty, obietnica zostanie zrealizowana przez obiekt formularza
    { value: undefined, done: true }
  • Jeśli podczas transmisji wystąpi błąd, obietnica zostanie odrzucona z odpowiednim błędem.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Właściwość locked

Aby sprawdzić, czy możliwy do odczytania strumień jest zablokowany, otwórz jego ReadableStream.locked usłudze.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Przykładowy kod strumienia, który można odczytać

Poniższy przykładowy kod pokazuje wszystkie kroki w praktyce. Najpierw tworzysz ReadableStream, który w Argument underlyingSource (czyli klasa TimestampSource) definiuje metodę start(). Ta metoda informuje właściwość controller strumienia do enqueue() oznacza sygnaturę czasową co sekundę w ciągu 10 sekund. Na koniec informuje kontroler, że ma close() strumień. Spożywasz streamując, tworząc czytnik za pomocą metody getReader() i wywołując funkcję read(), dopóki strumień nie zostanie done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iteracja asynchroniczna

Sprawdzanie każdej iteracji pętli read(), jeśli strumień jest done, może nie być najwygodniejszym interfejsem API. Na szczęście wkrótce istnieje lepszy sposób wykonywania tych czynności: iteracja asynchroniczna.

for await (const chunk of stream) {
  console.log(chunk);
}

Aby obejść ten problem, obecnie można stosować iterację asynchroniczną, stosując kod polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Tworzenie czytelnego strumienia

Metoda tee() funkcji Interfejs ReadableStream łączy bieżący czytelny strumień, zwracając tablicę dwuelementową zawierające dwie powstałe gałęzie jako nowe instancje ReadableStream. Dzięki temu 2 czytelników może jednocześnie czytać strumień. Można to zrobić na przykład w mechanizmie Service Worker, jeśli chcesz pobrać odpowiedź z serwera i przesłać ją strumieniowo do przeglądarki, ale także przesyłać ją strumieniowo pamięci podręcznej skryptu service worker. Treść odpowiedzi nie może być wykorzystana więcej niż raz, więc potrzebujesz 2 kopii w tym celu. Aby anulować strumień, musisz najpierw anulować obie zawarte w nim gałęzie. Rozpoczynanie transmisji zwykle blokuje się na określony czas, przez co inni czytelnicy nie mogą go zablokować.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Czytelne strumienie bajtów

Dla strumieni reprezentujących bajty dostarczana jest rozszerzona wersja czytelnego strumienia do obsługi i oszczędność bajtów, zwłaszcza przez zminimalizowanie liczby kopii. Strumienie bajtów umożliwiają korzystanie z własnego bufora (BYOB) czytelników do pozyskania. Domyślna implementacja może dawać zakres różnych danych wyjściowych, takich jak jako ciągi lub bufory tablicy w przypadku technologii WebSockets, podczas gdy strumienie bajtów gwarantują wynik w bajtach. Oprócz tego czytelnicy BYOB mają korzyści ze stabilności. To jest ponieważ odłączenie bufora może zagwarantować, że nie zapisze się on dwa razy w tym samym buforze, co pozwala uniknąć wyścigów. Czytniki BYOB mogą zmniejszyć liczbę uruchomień przeglądarki czyszczenia pamięci, ponieważ może ono wykorzystywać bufory.

Tworzenie czytelnego strumienia bajtów

Możesz utworzyć czytelny strumień bajtów, przekazując dodatkowy parametr type do funkcji ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Bazowe źródło zrozumiałego strumienia bajtowego otrzymuje ReadableByteStreamController do manipulacja. Metoda ReadableByteStreamController.enqueue() przyjmuje argument chunk, którego wartość ma status ArrayBufferView. Właściwość ReadableByteStreamController.byobRequest zwraca aktualną Żądanie pull BYOB lub wartość null, jeśli nie ma żadnego. Na koniec ReadableByteStreamController.desiredSize zwraca żądany rozmiar, aby wypełnić wewnętrzną kolejkę kontrolowanego strumienia.

queuingStrategy

Drugim, podobnie opcjonalnym argumentem konstruktora ReadableStream(), jest queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejki dla strumienia, która przyjmuje jedną :

  • highWaterMark: nieujemna liczba bajtów wskazująca na wysoki znak wodny strumienia korzystającego z tej strategii kolejkowania. Służy do określania obciążenia wstecznego, które pojawia się za pomocą odpowiedniej właściwości ReadableByteStreamController.desiredSize. Określa też to, kiedy wywoływana jest metoda pull() źródła.
. .

Metody getReader() i read()

Aby uzyskać dostęp do elementu ReadableStreamBYOBReader, ustaw odpowiednio parametr mode: ReadableStream.getReader({ mode: "byob" }) Umożliwia to dokładniejszą kontrolę bufora. aby uniknąć kopii. Aby odczytać strumień bajtów, musisz wywołać ReadableStreamBYOBReader.read(view), gdzie view to ArrayBufferView.

Przykładowy kod strumienia bajtowego, który można odczytać

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Poniższa funkcja zwraca możliwe do odczytania strumienie bajtów, które umożliwiają wydajny odczyt losowo wygenerowana tablica. Zamiast wstępnie określonego rozmiaru fragmentu 1024 próbuje on wypełnić dzięki buforowi dostarczanemu przez programistę, co daje pełną kontrolę.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mechanika strumienia, który można zapisać

Strumień z możliwością zapisu to miejsce docelowe, w którym możesz zapisywać dane. Jest ono reprezentowane w JavaScripcie przez WritableStream. Ten pełni funkcję abstrakcji „nad ujściem” bazowego ujścia – ujścia wejścia-wyjścia niższego poziomu, nieprzetworzonych danych.

Dane są zapisywane w strumieniu przez zapisujący, po jednym fragmencie. Fragment może zająć w wielu formach, tak jak w przypadku fragmentów w czytniku. Możesz użyć dowolnego kodu fragmenty gotowe do pisania, scenarzysta oraz powiązany z nim kod są nazywane producentem.

Gdy twórca zostaje utworzony i zaczyna pisać w strumieniu (jako aktywny twórca), mówi się, że zablokowanych. W strumieniu z możliwością zapisu może jednocześnie zapisywać tylko jeden twórca. Jeśli chcesz otrzymać kolejną scenarzysty, aby zacząć pisać do swojego strumienia, zwykle musisz go opublikować przed załączeniem wiadomości innego autora.

Wewnętrzna kolejka zawiera informacje o fragmentach, które zostały jeszcze zapisane w strumieniu, ale jeszcze ich nie ma. zostały przetworzone przez bazowe ujście.

Strategia kolejkowania to obiekt określający sposób, w jaki strumień powinien sygnalizować obciążenie wsteczne na podstawie o stanie jej wewnętrznej kolejki. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce do określonej liczby, czyli do wysokiego znaku wodnego.

Ostatni konstrukcja nosi nazwę kontrolera. Każdy strumień możliwy do zapisu ma powiązany kontroler, który umożliwia sterowanie transmisją (np. jej przerwanie).

Tworzenie strumienia z możliwością zapisu

Interfejs WritableStream usługi interfejs Streams API zapewnia standardową abstrakcję zapisu danych strumieniowanych do miejsca docelowego, jako zlew. Ten obiekt ma wbudowane funkcje backendu i kolejki. Tworzysz strumień z możliwością zapisu przez wywoływanie jego konstruktora WritableStream() Zawiera opcjonalny parametr underlyingSink, który reprezentuje obiekt z metodami i właściwościami, które określają, jak będzie zachowywać się utworzona instancja strumienia.

underlyingSink

underlyingSink może zawierać następujące opcjonalne metody zdefiniowane przez programistę. controller do niektórych metod jest parametr WritableStreamDefaultController

  • start(controller): ta metoda jest wywoływana natychmiast po utworzeniu obiektu. treści tej metody powinny mieć na celu uzyskanie dostępu do bazowego ujścia. Jeśli ma to być asynchronicznie, może zwrócić obietnicę sukcesu lub porażki.
  • write(chunk, controller): ta metoda jest wywoływana, gdy nowy fragment danych (określony w parametrze chunk) jest gotowy do zapisu w bazowym ujściu. Może zwrócić obietnicę powodzenie lub niepowodzenie operacji zapisu. Ta metoda zostanie wywołana dopiero po poprzedniej były zapisywane i nigdy po zamknięciu lub przerwaniu strumienia.
  • close(controller): ta metoda jest wywoływana, gdy aplikacja zasygnalizuje, że zakończyła pisanie fragmenty do strumienia. Zawartość powinna wykonywać niezbędne działania, aby zakończyć zapisy w w bazowym ujściu i zwolnić do niego dostęp. Jeśli ten proces jest asynchroniczny, może zwrócić błąd obiecują zasygnalizowanie sukcesu lub porażki. Ta metoda zostanie wywołana dopiero po wszystkich zapisach w kolejce im się udało.
  • abort(reason): ta metoda jest wywoływana, gdy aplikacja zasygnalizuje, że chce nagle się zamknąć. i określić jego stan jako błędu. Narzędzie to może wyczyścić wstrzymane zasoby, Funkcja close(), ale funkcja abort() zostanie wywołana nawet wtedy, gdy zapisy znajdują się w kolejce. Te kawałki zostaną wrzucone w domu. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę sukcesu lub niepowodzenia. Parametr reason zawiera wartość DOMString opisującą, dlaczego strumień został przerwany.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

WritableStreamDefaultController interfejs Streams API reprezentuje kontroler umożliwiający kontrolę stanu WritableStream podczas tworzenia, podczas przesyłania kolejnych fragmentów do pisania lub na końcu pisania. Podczas tworzenia a WritableStream, bazowe ujście otrzymuje odpowiednią wartość WritableStreamDefaultController do manipulacji. Funkcja WritableStreamDefaultController ma tylko 1 metodę: WritableStreamDefaultController.error(), co powoduje, że wszystkie przyszłe interakcje z tym strumieniem dają błąd. Funkcja WritableStreamDefaultController obsługuje również właściwość signal, która zwraca instancję AbortSignal, co umożliwia zatrzymanie operacji WritableStream w razie potrzeby.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Drugim, podobnie opcjonalnym argumentem konstruktora WritableStream(), jest queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia, która składa się z dwóch parametry:

  • highWaterMark: liczba nieujemna wskazująca na wysoki znak wodny strumienia przy użyciu tej strategii kolejkowania.
  • size(chunk): funkcja, która oblicza i zwraca skończony, nieujemny rozmiar danej wartości fragmentu. Wynik służy do określania naprężenia wstecznego, które pojawia się za pomocą odpowiedniej właściwości WritableStreamDefaultWriter.desiredSize.
.

Metody getWriter() i write()

Aby pisać w strumieniu z możliwością zapisu, musisz mieć scenarzystę, który będzie WritableStreamDefaultWriter Metoda getWriter() interfejsu WritableStream zwraca błąd nową instancję WritableStreamDefaultWriter i blokuje strumień do tej instancji. Natomiast strumień jest zablokowany, nie można uzyskać innego zapisującego, dopóki bieżący nie zostanie zwolniony.

write() funkcji WritableStreamDefaultWriter interfejs zapisuje przekazany fragment danych do obiektu WritableStream i jego ujścia, a następnie zwraca obietnica oznaczająca powodzenie lub niepowodzenie operacji zapisu. Pamiętaj, że „sukces” środków zależy od poziomu ich ujścia; może to oznaczać, że fragment został zaakceptowany, nie musi być bezpiecznie zapisana w ostatecznym miejscu docelowym.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Właściwość locked

Aby sprawdzić, czy strumień możliwy do zapisu jest zablokowany, otwórz WritableStream.locked usłudze.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Przykładowy kod strumienia z możliwością zapisu

Poniższy przykładowy kod pokazuje wszystkie kroki w praktyce.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Przypisanie czytelnego strumienia do strumienia z możliwością zapisu

Czytelny strumień może zostać przesłany do strumienia możliwego do zapisu przez pipeTo(). ReadableStream.pipeTo() przekształca wartość bieżącą ReadableStream w dane WritableStream i zwraca obietnica spełniająca się, gdy proces potoku zakończy się pomyślnie, lub odrzuca, jeśli wystąpiły błędy napotkano problem.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Tworzenie strumienia przekształcenia

Interfejs TransformStream interfejsu Streams API reprezentuje zbiór danych, które można przekształcić. Ty utworzyć strumień przekształcenia, wywołując jego konstruktor TransformStream(), który tworzy i zwraca obiektu strumienia przekształcenia z podanych modułów obsługi. Konstruktor TransformStream() akceptuje jako pierwszym argumentem jest opcjonalny obiekt JavaScript reprezentujący obiekt transformer. Takie obiekty mogą metody mogą być użyte w tych przypadkach:

transformer

  • start(controller): ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zwykle służy do umieszczania w kolejce fragmentów prefiksu za pomocą funkcji controller.enqueue(). Te fragmenty zostaną przeczytane z czytelnej strony, ale nie wymagają żadnych zapisów po stronie możliwej do zapisu. Jeśli pierwsza jest asynchroniczny, ponieważ np. uzyskanie fragmentów prefiksu wymaga pewnego wysiłku, funkcja może zwrócić obietnicę sukcesu lub niepowodzenia; odrzucona obietnica spowoduje błąd . Wszystkie zgłoszone wyjątki zostaną ponownie zgłoszone przez konstruktor TransformStream().
  • transform(chunk, controller): ta metoda jest wywoływana, gdy nowy fragment został pierwotnie zapisany w dostępna do zapisu strona jest gotowa do przekształcenia. Implementacja strumienia gwarantuje, że ta funkcja jest wywoływana dopiero po tym, jak poprzednie przekształcenia zakończą się sukcesem. Funkcja start() nigdy nie miała lub po wywołaniu funkcji flush(). Ta funkcja wykonuje rzeczywistą przekształcenie dla strumienia przekształcania. Może dodać wyniki do kolejki za pomocą funkcji controller.enqueue(). Ten umożliwia zapis pojedynczego fragmentu po stronie możliwej do zapisu. W przypadku otrzymania parametru czytelna strona w zależności od tego, ile razy funkcja controller.enqueue() została wywołana. Jeśli proces transformacji jest asynchroniczne, ta funkcja może zwrócić obietnicę sukcesu lub niepowodzenia do jej przekształcenia. Odrzucona obietnica spowoduje błąd zarówno na czytelnej, jak i zapisie przekształcenia. Jeśli nie podano metody transform(), używane jest przekształcenie tożsamości, które dodaje do kolejki fragmenty niezmienione od strony możliwej do zapisu na czytelną stronę.
  • flush(controller): ta metoda jest wywoływana po tym, jak wszystkie fragmenty zapisane po stronie możliwej do zapisu zostały zostanie przekształcone przez pomyślne przejście przez transform(), a strona do zapisu ma zostać zamknięto. Zwykle służy to do umieszczania w kolejce fragmentów sufiksu po czytelniczej stronie. zostanie zamknięte. Jeśli proces opróżniania jest asynchroniczny, funkcja może zwrócić obietnicę sygnał o powodzeniu lub niepowodzeniu sygnału; wynik zostanie przekazany do urządzenia wywołującego stream.writable.write() Dodatkowo odrzucona obietnica powoduje błędy zarówno w czytelnym, z drugiej strony strumienia. Zgłoszenie wyjątku jest traktowane tak samo jak zwrócenie odrzuconego lub obiecywanie.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategie kolejkowania: writableStrategy i readableStrategy

Drugi i trzeci opcjonalny parametr konstruktora TransformStream() są opcjonalne Strategie kolejkowania: writableStrategy i readableStrategy. Są one zdefiniowane zgodnie z Odczytujący i zapisywalny strumień sekcji.

Przykładowy kod strumienia przekształcania

Poniższy przykładowy kod pokazuje prosty strumień przekształcenia w działaniu.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Potokowanie czytelnego strumienia w strumieniu przekształcenia

pipeThrough() metoda interfejsu ReadableStream zapewnia łańcuchowy sposób dostarczania potoku w przypadku bieżącego strumienia za pomocą strumienia przekształcenia lub dowolnej innej możliwej do zapisu/odczytu pary. Przesyłanie strumienia zwykle jest blokowane przez cały czas jej trwania, uniemożliwiając innym czytelnikom zablokowanie go.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Następny przykładowy (nieco zmodyfikowany) kod pokazuje, jak zaimplementować funkcję „krzyki” wersja systemu fetch() w którym cały tekst jest pisany wielkimi literami dzięki zastosowaniu zwróconej obietnicy odpowiedzi jako strumień i używając wielkich liter we fragmencie. Zaletą tego rozwiązania jest to, że nie trzeba czekać, cały dokument, co może mieć ogromne znaczenie przy pracy z dużymi plikami.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Prezentacja

Poniższa demonstracja pokazuje działanie strumieni czytelnych, możliwych do zapisu i przekształconych. Zawiera też przykłady pipeThrough() i pipeTo(), a także pokazuje tee(). Opcjonalnie możesz uruchomić prezentację w osobnym oknie lub wyświetl kodu źródłowego.

Użyteczne strumienie dostępne w przeglądarce

W przeglądarce jest wiele przydatnych strumieni. Możesz łatwo utworzyć ReadableStream z bloba. Blob zwraca metodę stream() interfejsu ReadableStream, który po odczytaniu zwraca dane zawarte w obiekcie blob. Pamiętaj też, że Obiekt File jest konkretnym rodzajem Blob i można go używać w dowolnym kontekście, w jakim może działać obiekt blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Warianty strumienia TextDecoder.decode() i TextEncoder.encode() są nazywane TextDecoderStream i TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Kompresja lub dekompresja pliku jest łatwa dzięki CompressionStream i Strumienie przekształceń: DecompressionStream . Poniższy przykładowy kod pokazuje, jak pobrać specyfikację strumieni i skompresować ją (gzip) bezpośrednio w przeglądarce i zapisać skompresowany plik bezpośrednio na dysku.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Interfejs File System Access API FileSystemWritableFileStream. a eksperymentalne strumienie żądań fetch() są przykłady strumieni dostępnych do zapisu.

Interfejs Serial API często korzysta zarówno z czytelnych, jak i zapisujących strumieni.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Interfejs WebSocketStream API integruje strumienie z interfejsem WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Przydatne materiały

Podziękowania

Ten artykuł zrecenzował(a) Jake Archibald François Beaufort Sam Dutton Mattias Buelens Surma Joe Medley Adam Rice. Posty na blogu Jake'a Archibalda bardzo mi pomogły strumienie. Część przykładowego kodu jest inspirowana przez użytkownika GitHuba eksploracji użytkownika @bellbind oraz opiera się w dużej mierze na Dokumenty internetowe MDN w strumieniach Streams Standard autorzy wykonali ogromną pracę tę specyfikację. Baner powitalny: Ryan Lara w Odchylenie z ekranu.