Dowiedz się, jak używać strumieni do odczytu, zapisu i przekształcania za pomocą interfejsu Streams API.
Interfejs Streams API umożliwia programowy dostęp do strumieni danych otrzymywanych przez sieć lub tworzonych lokalnie w dowolny sposób oraz przetwarzanie ich za pomocą JavaScriptu. Strumieniowanie polega na podzieleniu zasobu, który chcesz otrzymać, wysłać lub przekształcić, na małe części, a następnie przetwarzaniu tych części po kolei. Przeglądarki i tak przesyłają strumieniowo zasoby, takie jak HTML czy filmy, które mają być wyświetlane na stronach internetowych. Jednak ta funkcja nigdy nie była dostępna w JavaScript przed 2015 r., kiedy wprowadzono fetch
.
Wcześniej, jeśli chciałeś przetworzyć jakiś zasób (np. plik wideo lub tekstowy), musiałeś pobrać cały plik, poczekać, aż zostanie on zdeserializowany do odpowiedniego formatu, a następnie go przetworzyć. Dzięki temu, że strumienie są dostępne w JavaScript, wszystko się zmienia. Możesz teraz przetwarzać nieprzetworzone dane w JavaScript stopniowo, gdy tylko staną się dostępne na urządzeniu klienta, bez konieczności generowania bufora, ciągu znaków ani obiektu blob. Umożliwia to wiele zastosowań, z których niektóre wymieniam poniżej:
- Efekty wideo: przesyłanie czytelnego strumienia wideo przez strumień przekształcający, który stosuje efekty w czasie rzeczywistym.
- Kompresja i dekompresja danych: przesyłanie strumienia plików przez strumień przekształcania, który selektywnie kompresuje lub dekompresuje dane.
- Dekodowanie obrazu: przesyłanie strumienia odpowiedzi HTTP przez strumień przekształcający, który dekoduje bajty na dane bitmapy, a następnie przez kolejny strumień przekształcający, który tłumaczy bitmapy na pliki PNG. Jeśli jest zainstalowany w
fetch
obsługi zdarzeń w usłudze Service Worker, umożliwia transparentne uzupełnianie nowych formatów obrazów, takich jak AVIF.
Obsługa przeglądarek
ReadableStream i WritableStream
TransformStream
Podstawowe pojęcia
Zanim przejdę do szczegółów dotyczących różnych typów strumieni, przedstawię kilka podstawowych pojęć.
Kawałki
Fragment to pojedynczy element danych zapisywany w strumieniu lub z niego odczytywany. Może to być dowolny typ. Strumienie mogą nawet zawierać fragmenty różnych typów. Zwykle fragment nie jest najmniejszą jednostką danych w danym strumieniu. Na przykład strumień bajtów może zawierać fragmenty składające się z jednostek o rozmiarze 16 KiBUint8Array
zamiast pojedynczych bajtów.
Strumienie do odczytu
Strumień odczytu reprezentuje źródło danych, z którego można odczytywać informacje. Innymi słowy, dane wypływają ze strumienia odczytu. Strumień odczytu to konkretnie instancja ReadableStream
klasy.
Strumienie z możliwością zapisu
Strumień z możliwością zapisu to miejsce docelowe danych, w którym możesz zapisywać informacje. Inaczej mówiąc, dane trafiają do strumienia z możliwością zapisu. Strumień zapisywalny to instancja klasy WritableStream
.
Przekształcanie strumieni
Strumień przekształcający składa się z pary strumieni: strumienia do zapisu, zwanego stroną do zapisu, i strumienia do odczytu, zwanego stroną do odczytu.
W realnym świecie można to porównać do tłumacza symultanicznego, który tłumaczy z jednego języka na drugi na bieżąco.
W przypadku strumienia przekształcającego zapisywanie po stronie zapisywalnej powoduje udostępnienie nowych danych do odczytu po stronie odczytywalnej. Każdy obiekt z właściwościami writable
i readable
może służyć jako strumień transformacji. Jednak standardowa klasa TransformStream
ułatwia tworzenie takiej pary, która jest prawidłowo splątana.
Łańcuchy do rur
Strumienie są używane głównie do przekazywania ich do siebie. Strumień do odczytu można przekierować bezpośrednio do strumienia do zapisu za pomocą metody pipeTo()
strumienia do odczytu lub przekierować najpierw przez co najmniej 1 strumień przekształcający za pomocą metody pipeThrough()
strumienia do odczytu. Zestaw połączonych w ten sposób strumieni nazywa się łańcuchem potoków.
Ciśnienie zwrotne
Po utworzeniu łańcucha potoków będzie on przekazywać sygnały dotyczące tego, jak szybko powinny przez niego przepływać fragmenty. Jeśli którykolwiek krok w łańcuchu nie może jeszcze zaakceptować fragmentów, przesyła sygnał wstecz przez łańcuch potoku, aż w końcu oryginalne źródło otrzyma polecenie, aby nie produkować fragmentów tak szybko. Ten proces normalizacji przepływu jest nazywany ograniczeniem przepustowości.
Teeing
Strumień odczytu można rozdzielić (nazwa pochodzi od kształtu wielkiej litery „T”) za pomocą metody tee()
.
Spowoduje to zablokowanie strumienia, czyli uniemożliwi jego bezpośrednie użycie. Utworzy jednak 2 nowe strumienie, zwane gałęziami, które można wykorzystywać niezależnie.
Teeing jest też ważny, ponieważ strumieni nie można przewijać ani ponownie uruchamiać. Więcej informacji na ten temat znajdziesz w dalszej części artykułu.
Mechanizm strumienia odczytu
Strumień do odczytu to źródło danych reprezentowane w JavaScript przez obiekt ReadableStream
, który przepływa z bazowego źródła. Konstruktor
ReadableStream()
tworzy i zwraca obiekt strumienia do odczytu na podstawie podanych modułów obsługi. Istnieją 2 rodzaje źródeł danych:
- Źródła typu push stale przesyłają dane, gdy uzyskasz do nich dostęp. To Ty decydujesz, czy chcesz rozpocząć, wstrzymać czy anulować dostęp do strumienia. Przykłady to strumieniowe transmisje wideo na żywo, zdarzenia wysyłane przez serwer lub WebSockets.
- Źródła typu pull wymagają, aby po nawiązaniu połączenia z nimi wyraźnie poprosić o dane. Przykłady obejmują operacje HTTP za pomocą wywołań
fetch()
lubXMLHttpRequest
.
Dane strumieniowe są odczytywane sekwencyjnie w małych częściach zwanych fragmentami. Fragmenty umieszczane w strumieniu są dodawane do kolejki. Oznacza to, że czekają w kolejce na odczytanie. Wewnętrzna kolejka śledzi fragmenty, które nie zostały jeszcze odczytane.
Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować nadmierne obciążenie na podstawie stanu wewnętrznej kolejki. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce z określoną liczbą, zwaną wysokim poziomem.
Fragmenty w strumieniu są odczytywane przez czytnik. Czytnik pobiera dane po jednym bloku, co pozwala wykonywać na nich dowolne operacje. Czytnik wraz z kodem przetwarzania nazywa się konsumentem.
Kolejny element w tym kontekście to kontroler. Każdy strumień do odczytu ma powiązany kontroler, który, jak sama nazwa wskazuje, umożliwia sterowanie strumieniem.
W danym momencie strumień może odczytywać tylko jeden czytnik. Gdy czytnik zostanie utworzony i zacznie odczytywać strumień (czyli stanie się aktywnym czytnikiem), zostanie z nim zablokowany. Jeśli chcesz, aby inny czytnik przejął odczytywanie strumienia, zwykle musisz zwolnić pierwszy czytnik, zanim zrobisz cokolwiek innego (chociaż możesz rozdzielić strumienie).
Tworzenie strumienia do odczytu
Strumień do odczytu tworzy się przez wywołanie jego konstruktora ReadableStream()
.
Konstruktor ma argument opcjonalny underlyingSource
, który reprezentuje obiekt z metodami i właściwościami określającymi sposób działania utworzonej instancji strumienia.
underlyingSource
Może to wykorzystywać te opcjonalne metody zdefiniowane przez dewelopera:
start(controller)
: wywoływana natychmiast po utworzeniu obiektu. Metoda może uzyskać dostęp do źródła strumienia i wykonać wszelkie inne czynności wymagane do skonfigurowania funkcji strumienia. Jeśli ten proces ma być wykonywany asynchronicznie, metoda może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Parametrcontroller
przekazywany do tej metody toReadableStreamDefaultController
.pull(controller)
: może służyć do sterowania strumieniem w miarę pobierania kolejnych fragmentów. Jest wywoływana wielokrotnie, dopóki wewnętrzna kolejka fragmentów strumienia nie jest pełna, aż do osiągnięcia przez nią wysokiego poziomu. Jeśli wynikiem wywołaniapull()
jest obietnica,pull()
nie zostanie ponownie wywołana, dopóki ta obietnica nie zostanie spełniona. Jeśli obietnica zostanie odrzucona, strumień będzie zawierać błędy.cancel(reason)
: wywoływana, gdy odbiorca strumienia anuluje strumień.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
obsługuje te metody:
ReadableStreamDefaultController.close()
zamyka powiązany strumień.ReadableStreamDefaultController.enqueue()
umieszcza dany fragment w powiązanym strumieniu.ReadableStreamDefaultController.error()
powoduje, że wszystkie przyszłe interakcje z powiązanym strumieniem będą generować błędy.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Drugi, również opcjonalny, argument konstruktora ReadableStream()
to queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia. Przyjmuje 2 parametry:
highWaterMark
: nieujemna liczba wskazująca maksymalny poziom strumienia przy użyciu tej strategii kolejkowania.size(chunk)
: funkcja, która oblicza i zwraca skończony nieujemny rozmiar podanej wartości fragmentu. Wynik jest używany do określania ciśnienia zwrotnego, które jest widoczne we właściwościReadableStreamDefaultController.desiredSize
. Określa też, kiedy wywoływana jest metodapull()
źródła bazowego.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Metody getReader()
i read()
Aby odczytać dane ze strumienia do odczytu, potrzebujesz czytnika, który będzie obiektem ReadableStreamDefaultReader
.
Metoda getReader()
interfejsu ReadableStream
tworzy czytnik i blokuje strumień. Gdy strumień jest zablokowany, nie można uzyskać dostępu do innego czytnika, dopóki ten nie zostanie zwolniony.
Metoda read()
interfejsu ReadableStreamDefaultReader
zwraca obietnicę zapewniającą dostęp do następnego fragmentu w wewnętrznej kolejce strumienia. Zwraca wynik lub odrzuca go w zależności od stanu strumienia. Możliwe są te opcje:
- Jeśli fragment jest dostępny, obietnica zostanie spełniona za pomocą obiektu w formacie
{ value: chunk, done: false }
. - Jeśli strumień zostanie zamknięty, obietnica zostanie spełniona za pomocą obiektu w formacie
{ value: undefined, done: true }
. - Jeśli strumień zwróci błąd, obietnica zostanie odrzucona z odpowiednim błędem.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Usługa locked
Możesz sprawdzić, czy strumień do odczytu jest zablokowany, uzyskując dostęp do jego właściwości ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Przykładowy kod strumienia do odczytu
Poniższy przykładowy kod pokazuje wszystkie kroki w działaniu. Najpierw utwórz ReadableStream
, która w argumencie underlyingSource
(czyli w klasie TimestampSource
) definiuje metodę start()
.
Ta metoda przekazuje do strumienia controller
enqueue()
sygnaturę czasową co sekundę przez 10 sekund.
Na koniec informuje kontroler, aby close()
strumień. Ten strumień możesz wykorzystać, tworząc czytnik za pomocą metody getReader()
i wywołując read()
, dopóki strumień nie zostanie done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iteracja asynchroniczna
Sprawdzanie, czy strumień jest done
, w każdej iteracji pętli read()
może nie być najwygodniejszym interfejsem API.
Na szczęście wkrótce pojawi się lepszy sposób na to: iteracja asynchroniczna.
for await (const chunk of stream) {
console.log(chunk);
}
Obecnie obejściem umożliwiającym korzystanie z iteracji asynchronicznej jest zaimplementowanie działania za pomocą polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Rozdzielanie strumienia do odczytu
Metoda tee()
interfejsu ReadableStream
rozdziela bieżący strumień do odczytu i zwraca dwuelementową tablicę zawierającą 2 wynikowe gałęzie jako nowe instancje ReadableStream
. Dzięki temu 2 czytniki mogą odczytywać strumień jednocześnie. Możesz to zrobić na przykład w skrypcie service worker, jeśli chcesz pobrać odpowiedź z serwera i przesłać ją strumieniowo do przeglądarki, ale też do pamięci podręcznej skryptu service worker. Ciała odpowiedzi nie można użyć więcej niż raz, więc potrzebujesz 2 kopii. Aby anulować strumień, musisz anulować oba powstałe rozgałęzienia. Utworzenie kopii strumienia
zwykle blokuje go na cały czas trwania, uniemożliwiając innym czytelnikom zablokowanie go.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Strumienie bajtów do odczytu
W przypadku strumieni reprezentujących bajty udostępniana jest rozszerzona wersja strumienia do odczytu, która umożliwia wydajne przetwarzanie bajtów, w szczególności przez minimalizowanie kopii. Strumienie bajtów umożliwiają pozyskiwanie czytelników, którzy korzystają z własnego bufora (BYOB). Domyślna implementacja może generować różne dane wyjściowe, np. ciągi znaków lub bufory tablicowe w przypadku WebSocketów, natomiast strumienie bajtów gwarantują dane wyjściowe w postaci bajtów. Czytniki BYOB mają też zalety związane ze stabilnością. Dzieje się tak, ponieważ jeśli bufor zostanie odłączony, można zagwarantować, że nie będzie można zapisać w nim danych dwukrotnie, co pozwala uniknąć sytuacji wyścigu. Czytniki BYOB mogą zmniejszyć liczbę uruchomień odśmiecania przez przeglądarkę, ponieważ mogą ponownie wykorzystywać bufory.
Tworzenie czytelnego strumienia bajtów
Czytelny strumień bajtów możesz utworzyć, przekazując dodatkowy parametr type
do konstruktora ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
Podstawowe źródło czytelnego strumienia bajtów jest oznaczone symbolem ReadableByteStreamController
, który umożliwia manipulowanie nim. Jej metoda ReadableByteStreamController.enqueue()
przyjmuje argument chunk
, którego wartością jest ArrayBufferView
. Właściwość ReadableByteStreamController.byobRequest
zwraca bieżące żądanie ściągnięcia BYOB lub wartość null, jeśli nie ma takiego żądania. Na koniec właściwość ReadableByteStreamController.desiredSize
zwraca żądany rozmiar, aby wypełnić wewnętrzną kolejkę kontrolowanego strumienia.
queuingStrategy
Drugi, również opcjonalny, argument konstruktora ReadableStream()
to queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia. Przyjmuje on jeden parametr:
highWaterMark
: nieujemna liczba bajtów wskazująca górny próg strumienia korzystającego z tej strategii kolejkowania. Służy to do określania nadmiernego obciążenia, które objawia się za pomocą odpowiedniej właściwościReadableByteStreamController.desiredSize
. Określa też, kiedy wywoływana jest metodapull()
źródła bazowego.
Metody getReader()
i read()
Następnie możesz uzyskać dostęp do ReadableStreamBYOBReader
, odpowiednio ustawiając parametr mode
:
ReadableStream.getReader({ mode: "byob" })
. Umożliwia to precyzyjniejszą kontrolę nad przydzielaniem bufora, aby uniknąć kopii. Aby odczytać dane ze strumienia bajtów, musisz wywołać funkcję ReadableStreamBYOBReader.read(view)
, gdzie view
jest obiektem ArrayBufferView
.
Przykładowy kod czytelnego strumienia bajtów
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Ta funkcja zwraca czytelne strumienie bajtów, które umożliwiają wydajne odczytywanie bez kopiowania losowo wygenerowanej tablicy. Zamiast używać z góry określonego rozmiaru fragmentu wynoszącego 1024 bajtów, próbuje wypełnić bufor dostarczony przez dewelopera, co zapewnia pełną kontrolę.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Mechanizm strumienia zapisu
Strumień zapisu to miejsce docelowe, do którego możesz zapisywać dane. W JavaScript jest on reprezentowany przez obiekt WritableStream
. Jest to abstrakcja podstawowego ujścia, czyli ujścia wejścia/wyjścia niższego poziomu, do którego zapisywane są dane pierwotne.
Dane są zapisywane w strumieniu za pomocą zapisywarki, po jednym bloku naraz. Fragment może przybierać wiele form, podobnie jak fragmenty w czytniku. Możesz użyć dowolnego kodu, aby przygotować fragmenty gotowe do napisania. Osoba pisząca i powiązany z nią kod to producent.
Gdy moduł zapisujący zostanie utworzony i zacznie zapisywać dane w strumieniu (aktywny moduł zapisujący), mówi się, że jest on zablokowany. W danym momencie tylko jeden zapisujący może zapisywać dane w strumieniu z możliwością zapisu. Jeśli chcesz, aby inny autor zaczął pisać do Twojego strumienia, musisz zwykle go zwolnić, a następnie dołączyć do niego innego autora.
Kolejka wewnętrzna śledzi fragmenty, które zostały zapisane w strumieniu, ale nie zostały jeszcze przetworzone przez bazowy odbiornik.
Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować nadmierne obciążenie na podstawie stanu wewnętrznej kolejki. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce z określoną liczbą, zwaną wysokim poziomem.
Ostateczna konstrukcja jest nazywana kontrolerem. Każdy strumień z możliwością zapisu ma powiązany kontroler, który umożliwia sterowanie strumieniem (np. przerwanie go).
Tworzenie strumienia zapisu
Interfejs WritableStream
interfejsu Streams API zapewnia standardową abstrakcję do zapisywania danych strumieniowych w miejscu docelowym, zwanym odbiornikiem. Ten obiekt ma wbudowane funkcje ciśnienia zwrotnego i kolejkowania. Strumień do zapisu tworzy się przez wywołanie jego konstruktora WritableStream()
.
Ma opcjonalny parametr underlyingSink
, który reprezentuje obiekt z metodami i właściwościami określającymi sposób działania utworzonej instancji strumienia.
underlyingSink
underlyingSink
może zawierać te opcjonalne metody zdefiniowane przez dewelopera: Parametr controller
przekazywany do niektórych metod jest obiektem WritableStreamDefaultController
.
start(controller)
: ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Treść tej metody powinna umożliwiać uzyskanie dostępu do bazowego miejsca docelowego. Jeśli ten proces ma być wykonywany asynchronicznie, może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie.write(chunk, controller)
: ta metoda jest wywoływana, gdy nowy fragment danych (określony w parametrzechunk
) jest gotowy do zapisania w bazowym miejscu docelowym. Może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie operacji zapisu. Ta metoda jest wywoływana tylko po pomyślnym zakończeniu poprzednich operacji zapisu i nigdy po zamknięciu lub przerwaniu strumienia.close(controller)
: ta metoda zostanie wywołana, jeśli aplikacja zasygnalizuje, że zakończyła zapisywanie fragmentów w strumieniu. Zawartość powinna wykonać wszystkie niezbędne czynności, aby zakończyć zapisywanie w podstawowym miejscu docelowym i zwolnić do niego dostęp. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Ta metoda zostanie wywołana dopiero po pomyślnym zakończeniu wszystkich zapisów w kolejce.abort(reason)
: ta metoda zostanie wywołana, jeśli aplikacja zasygnalizuje, że chce nagle zamknąć strumień i przekształcić go w stan błędu. Może czyścić wszystkie przechowywane zasoby, podobnie jakclose()
, aleabort()
będzie wywoływana nawet wtedy, gdy zapisy są w kolejce. Te fragmenty zostaną odrzucone. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Parametrreason
zawieraDOMString
z wyjaśnieniem, dlaczego strumień został przerwany.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Interfejs WritableStreamDefaultController
interfejsu Streams API reprezentuje kontroler umożliwiający sterowanie stanem WritableStream
podczas konfiguracji, w miarę przesyłania kolejnych fragmentów do zapisu lub na końcu zapisu. Podczas tworzenia WritableStream
odpowiedniemu elementowi docelowemu przekazywana jest instancja WritableStreamDefaultController
, którą można manipulować. Obiekt WritableStreamDefaultController
ma tylko jedną metodę:WritableStreamDefaultController.error()
, która powoduje, że wszystkie przyszłe interakcje z powiązaną transmisją kończą się błędem.
WritableStreamDefaultController
obsługuje też właściwość signal
, która zwraca instancję AbortSignal
, umożliwiając w razie potrzeby zatrzymanie operacji WritableStream
.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Drugi, również opcjonalny, argument konstruktora WritableStream()
to queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia. Przyjmuje 2 parametry:
highWaterMark
: nieujemna liczba wskazująca maksymalny poziom strumienia przy użyciu tej strategii kolejkowania.size(chunk)
: funkcja, która oblicza i zwraca skończony nieujemny rozmiar podanej wartości fragmentu. Wynik jest używany do określania ciśnienia zwrotnego, które jest widoczne we właściwościWritableStreamDefaultWriter.desiredSize
.
Metody getWriter()
i write()
Aby pisać w strumieniu z możliwością zapisu, musisz mieć moduł zapisujący, który będzieWritableStreamDefaultWriter
. Metoda getWriter()
interfejsu WritableStream
zwraca nową instancję WritableStreamDefaultWriter
i blokuje strumień w tej instancji. Gdy strumień jest zablokowany, nie można uzyskać dostępu do innego zapisu, dopóki bieżący nie zostanie zwolniony.
Metoda write()
interfejsu WritableStreamDefaultWriter
zapisuje przekazany fragment danych w WritableStream
i jego bazowym ujściu, a następnie zwraca obietnicę, która jest spełniana, aby wskazać powodzenie lub niepowodzenie operacji zapisu. Pamiętaj, że to, co oznacza „sukces”, zależy od bazowego odbiornika. Może to oznaczać, że fragment został zaakceptowany, ale niekoniecznie, że został bezpiecznie zapisany w miejscu docelowym.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Usługa locked
Aby sprawdzić, czy strumień zapisywalny jest zablokowany, otwórz jego właściwość WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Przykładowy kod strumienia zapisu
Poniższy przykładowy kod pokazuje wszystkie kroki w działaniu.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Przekazywanie strumienia do odczytu do strumienia do zapisu
Strumień do odczytu można przekierować do strumienia do zapisu za pomocą metody pipeTo()
strumienia do odczytu.
ReadableStream.pipeTo()
przekazuje bieżący ReadableStream
do danego WritableStream
i zwraca obietnicę, która zostanie spełniona, gdy proces przekazywania zakończy się pomyślnie, lub odrzucona, jeśli wystąpią błędy.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Tworzenie strumienia przekształceń
Interfejs TransformStream
interfejsu Streams API reprezentuje zestaw danych, które można przekształcać. Strumień przekształcający tworzy się przez wywołanie jego konstruktora TransformStream()
, który tworzy i zwraca obiekt strumienia przekształcającego na podstawie podanych funkcji obsługi. Konstruktor TransformStream()
przyjmuje jako pierwszy argument opcjonalny obiekt JavaScript reprezentujący transformer
. Takie obiekty mogą zawierać dowolną z tych metod:
transformer
start(controller)
: ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zwykle służy do umieszczania w kolejce fragmentów prefiksu za pomocą znakucontroller.enqueue()
. Te fragmenty będą odczytywane ze strony do odczytu, ale nie zależą od żadnych zapisów na stronie do zapisu. Jeśli ten początkowy proces jest asynchroniczny, na przykład dlatego, że wymaga pewnego wysiłku, aby uzyskać fragmenty prefiksu, funkcja może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Odrzucona obietnica spowoduje błąd strumienia. Wszelkie zgłoszone wyjątki zostaną ponownie zgłoszone przez konstruktorTransformStream()
.transform(chunk, controller)
: ta metoda jest wywoływana, gdy nowy fragment pierwotnie zapisany po stronie z możliwością zapisu jest gotowy do przekształcenia. Implementacja strumienia gwarantuje, że ta funkcja zostanie wywołana tylko po pomyślnym zakończeniu poprzednich przekształceń i nigdy przed zakończeniem działania funkcjistart()
ani po wywołaniu funkcjiflush()
. Ta funkcja wykonuje rzeczywistą pracę związaną z przekształcaniem strumienia przekształceń. Może umieszczać wyniki w kolejce za pomocą funkcjicontroller.enqueue()
. Dzięki temu pojedynczy fragment zapisany po stronie z możliwością zapisu może spowodować powstanie zera lub wielu fragmentów po stronie z możliwością odczytu, w zależności od tego, ile razy zostanie wywołana funkcjacontroller.enqueue()
. Jeśli proces przekształcania jest asynchroniczny, ta funkcja może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie przekształcenia. Odrzucona obietnica spowoduje błąd zarówno po stronie odczytu, jak i zapisu strumienia przekształcenia. Jeśli nie podasz żadnejtransform()
metody, użyta zostanie transformacja tożsamości, która umieszcza w kolejce fragmenty bez zmian po stronie zapisu i odczytu.flush(controller)
: ta metoda jest wywoływana po tym, jak wszystkie fragmenty zapisane po stronie zapisu zostaną przekształcone przez pomyślne przejście przeztransform()
, a strona zapisu ma zostać zamknięta. Zwykle służy to do umieszczania w kolejce fragmentów sufiksu po stronie odczytu, zanim ta strona również zostanie zamknięta. Jeśli proces opróżniania jest asynchroniczny, funkcja może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Wynik zostanie przekazany do wywołującego funkcjęstream.writable.write()
. Dodatkowo odrzucona obietnica spowoduje błąd zarówno po stronie odczytu, jak i zapisu strumienia. Wyjątek jest traktowany tak samo jak zwrócenie odrzuconego obiektu Promise.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Strategie kolejkowania writableStrategy
i readableStrategy
Drugi i trzeci opcjonalny parametr konstruktora TransformStream()
to opcjonalne strategie writableStrategy
i readableStrategy
kolejkowania. Są one zdefiniowane w sposób opisany w sekcjach dotyczących strumieni do odczytu i do zapisu.
Przykładowy kod strumienia przekształceń
Poniższy przykładowy kod pokazuje działanie prostego strumienia przekształcającego.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Przesyłanie strumienia do odczytu przez strumień przekształcający
Metoda pipeThrough()
interfejsu ReadableStream
umożliwia łączenie strumieni przez przekazywanie bieżącego strumienia przez strumień przekształcający lub dowolną inną parę strumieni do zapisu i odczytu. Przekierowanie strumienia zwykle blokuje go na czas przekierowania, uniemożliwiając innym czytnikom zablokowanie go.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Następny (trochę naciągany) przykładowy kod pokazuje, jak można wdrożyć „krzykliwą” wersję funkcji fetch()
, która zamienia wszystkie litery na wielkie, wykorzystując zwróconą obietnicę odpowiedzi jako strumień i zamieniając litery na wielkie partiami. Zaletą tego podejścia jest to, że nie musisz czekać na pobranie całego dokumentu, co może mieć ogromne znaczenie w przypadku dużych plików.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Prezentacja
Wersja demonstracyjna poniżej pokazuje działanie strumieni do odczytu, zapisu i przekształcania. Zawiera też przykłady pipeThrough()
i pipeTo()
ciągów znaków | oraz pokazuje tee()
. Możesz opcjonalnie uruchomić wersję demonstracyjną w osobnym oknie lub wyświetlić kod źródłowy.
Przydatne strumienie dostępne w przeglądarce
W przeglądarce jest wbudowanych kilka przydatnych strumieni. Możesz łatwo utworzyć ReadableStream
z obiektu blob. Metoda stream() interfejsu Blob
zwraca ReadableStream
, który po odczytaniu zwraca dane zawarte w obiekcie blob. Pamiętaj też, że obiekt File
to określony rodzaj Blob
, którego można używać w każdym kontekście, w którym można używać obiektu blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Wersje strumieniowe TextDecoder.decode()
i TextEncoder.encode()
są nazywane odpowiednio TextDecoderStream
i TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Kompresowanie i dekompresowanie plików jest łatwe dzięki strumieniom przekształceń CompressionStream
i DecompressionStream
. Poniższy przykładowy kod pokazuje, jak pobrać specyfikację Streams, skompresować ją (gzip) bezpośrednio w przeglądarce i zapisać skompresowany plik bezpośrednio na dysku.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Interfejs File System Access API i eksperymentalne fetch()
strumienie żądań to przykłady strumieni z możliwością zapisu.FileSystemWritableFileStream
Interfejs Serial API w dużym stopniu korzysta ze strumieni do odczytu i zapisu.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Ostatni interfejs API WebSocketStream
integruje strumienie z interfejsem WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Przydatne materiały
- Specyfikacja strumieni
- Wersje demonstracyjne
- Streams polyfill
- 2016 r. – rok strumieni danych z sieci
- Iteratory i generatory asynchroniczne
- Stream Visualizer
Podziękowania
Ten artykuł został sprawdzony przez Jake’a Archibalda, François Beauforta, Sama Duttona, Mattiasa Buelensa, Surma, Joe Medleya i Adama Rice’a. Posty na blogu Jake’a Archibalda bardzo pomogły mi zrozumieć strumienie. Niektóre przykłady kodu zostały zainspirowane eksploracjami użytkownika GitHuba @bellbind, a fragmenty tekstu w dużej mierze opierają się na dokumentacji MDN Web Docs na temat interfejsu Streams API. Autorzy specyfikacji Streams Standard wykonali świetną pracę. Baner powitalny: Ryan Lara na stronie Unsplash.