Dowiedz się, jak używać strumieni odczytu, zapisu i przekształcania za pomocą interfejsu Streams API.
Interfejs Streams API umożliwia programowy dostęp do strumieni danych otrzymanych przez sieć lub utworzonych lokalnie za pomocą dowolnych środków, a także ich przetwarzanie za pomocą kodu JavaScript. Strumieniowanie polega na rozbiciu zasobu, który chcesz otrzymać, wysłać lub przekształcić, na małe fragmenty, a następnie przetwarzaniu tych fragmentów bit po bicie. Chociaż strumieniowanie jest czymś, co przeglądarki robią, gdy otrzymują zasoby, takie jak HTML lub filmy, aby wyświetlać je na stronach internetowych, ta funkcja nigdy nie była dostępna dla JavaScriptu przed wprowadzeniem w 2015 r. strumieniowania.fetch
Wcześniej, jeśli chciałeś przetworzyć jakiś zasób (np. film lub plik tekstowy), musiałeś pobrać cały plik, poczekać, aż zostanie on zdeserializowany do odpowiedniego formatu, a potem przetworzyć go. Wszystko się zmienia, gdy strumienie są dostępne dla JavaScript. Teraz możesz przetwarzać dane nieprzetworzone za pomocą JavaScriptu stopniowo, gdy tylko będą dostępne na kliencie, bez konieczności generowania bufora, ciągu znaków ani bloba. Umożliwia to wiele zastosowań, w tym:
- Efekty wideo: przesyłanie czytelnego strumienia wideo przez strumień transformacji, który stosuje efekty w czasie rzeczywistym.
- (De)kompresowanie danych: przekierowanie strumienia plików przez strumień transformacji, który selektywnie (de)kompresuje dane.
- Dekodowanie obrazu: przekierowanie strumienia odpowiedzi HTTP przez strumień transformacji, który dekoduje bajty na dane bitmapy, a następnie przez inny strumień transformacji, który przekształca bitmapy w pliki PNG. Jeśli jest zainstalowany w obiekcie
fetch
w ramach usługi, umożliwia to przejrzyste wypełnianie nowych formatów obrazów, takich jak AVIF.
Obsługa przeglądarek
ReadableStream i WritableStream
TransformStream
Podstawowe pojęcia
Zanim omówię szczegółowo różne typy strumieni, przedstawię kilka podstawowych pojęć.
Chunks
Kawałek to pojedynczy element danych zapisywany w strumieniu lub z niego odczytywany. Może to być dowolny typ; strumienie mogą zawierać fragmenty różnych typów. W większości przypadków fragment nie będzie najbardziej elementarną jednostką danych w danym strumieniu. Na przykład strumień bajtów może zawierać fragmenty o wielkości 16 KiBUint8Array
zamiast pojedynczych bajtów.
Czytelne strumienie
Czytalny strumień to źródło danych, z którego możesz odczytywać dane. Innymi słowy, dane wychodzą z czytelnego strumienia. Strumień tekstowy to konkretnie instancja klasy ReadableStream
.
Strumienie z możliwością zapisu
Możliwość zapisu w strumieniu oznacza, że jest to miejsce docelowe danych, do których możesz zapisywać. Inaczej mówiąc, dane przechodzą do strumienia do zapisu. Strumień do zapisu to instancja klasy WritableStream
.
Przekształcanie strumieni
Strumień transformacji składa się z 2 strumieni: strumienia do zapisu (zwanego stroną do zapisu) i strumienia do odczytu (zwanego stroną do odczytu).
W realnym świecie metaforą tego jest tłumacz symultaniczny, który na bieżąco tłumaczy z jednego języka na inny.
W sposób specyficzny dla strumienia transformacji zapisywanie na stronie z możliwością zapisu powoduje, że nowe dane są dostępne do odczytu po stronie z możliwością odczytu. Konkretnie dowolny obiekt z właściwością writable
i właściwością readable
może służyć jako strumień transformacji. Standardowa klasa TransformStream
ułatwia jednak tworzenie par, które są prawidłowo splecione.
Łańcuchy do rur
Strumienie są głównie używane do przesyłania ich do siebie nawzajem. Strumień do odczytu można przekierować bezpośrednio do strumienia do zapisu, używając metody pipeTo()
strumienia do odczytu. Można go też przekierować przez jeden lub więcej strumieni transformacji, używając metody pipeThrough()
strumienia do odczytu. Zbiór połączonych strumieni nazywamy łańcuchem.
Ciśnienie wsteczne
Gdy łańcuch przewodów zostanie utworzony, będzie rozpowszechniać sygnały dotyczące tego, jak szybko powinny przepływać przez niego elementy. Jeśli którykolwiek z etapów w łańcuchu nie może jeszcze przyjąć fragmentów, rozprzestrzenia on sygnał w przeciwnym kierunku przez cały łańcuch, aż do momentu, gdy pierwotne źródło otrzyma informację o zaprzestaniu tak szybkiego generowania fragmentów. Ten proces normalizacji przepływu nazywa się odwrotnym ciśnieniem.
Teeing
Przepływ danych do odczytu może być rozdzielany (nazwa pochodzi od kształtu litery „T” w wielkiej literze) za pomocą metody tee()
.
Spowoduje to zablokowanie strumienia, co oznacza, że nie będzie można go używać bezpośrednio. Spowoduje to jednak utworzenie 2 nowych strumieni, zwanych gałęziami, które można odtwarzać niezależnie.
Ważne jest też to, że strumieni nie można cofnąć ani ponownie uruchomić. Więcej informacji na ten temat znajdziesz poniżej.
Mechanika czytelnego strumienia
Czytelny strumień to źródło danych reprezentowane w JavaScriptie przez obiekt ReadableStream
, który pochodzi z podstawowego źródła. Konstruktor ReadableStream()
tworzy i zwraca obiekt strumienia do odczytu na podstawie podanych modułów obsługi. Istnieją 2 rodzaje źródeł danych:
- Źródła push przesyłają dane do Ciebie, gdy tylko uzyskasz do nich dostęp. To od Ciebie zależy, czy chcesz rozpocząć, wstrzymać lub anulować dostęp do strumienia. Przykłady to strumienie wideo na żywo, zdarzenia wysyłane przez serwer lub WebSockets.
- Źródła danych typu „pull” wymagają, aby po nawiązaniu połączenia z nimi wysłać do nich wyraźne żądanie danych. Przykłady obejmują operacje HTTP za pomocą wywołań
fetch()
lubXMLHttpRequest
.
Dane strumienia są odczytywane sekwencyjnie w mniejszych częściach zwanych fragmentami. Fragmenty umieszczone w strumieniu są wstawiane do kolejki. Oznacza to, że są one w kolejce i gotowe do odczytania. Kolejka wewnętrzna śledzi fragmenty, które nie zostały jeszcze przeczytane.
Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować odwrotny nacisk na podstawie stanu swojej kolejki wewnętrznej. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce ze wskazaną liczbą, zwaną wysoką wartością graniczną.
Fragmenty w strumieniu są odczytywane przez czytnik. Ten czytnik pobiera dane po kawałku, co pozwala na wykonywanie na nich dowolnych operacji. Czytnik wraz z innym kodem przetwarzania, który jest z nim powiązany, jest nazywany konsumentem.
Kolejna konstrukcja w tym kontekście nazywa się kontroler. Każdy odczytywalny strumień ma powiązany kontroler, który, jak sama nazwa wskazuje, umożliwia sterowanie tym strumieniem.
Strumień może odczytywać tylko 1 czytnik naraz. Gdy czytnik zostanie utworzony i zacznie odczytywać strumień (czyli stanie się aktywnym czytnikiem), zostaje zablokowany. Jeśli chcesz, aby inny czytelnik przejął czytanie strumienia, musisz najpierw zwolnić pierwszego czytelnika (chociaż możesz rozgałęzić strumień).
Tworzenie czytelnego strumienia
Aby utworzyć strumień do odczytu, wywołaj jego konstruktor:
ReadableStream()
.
Konstruktor ma opcjonalny argument underlyingSource
, który reprezentuje obiekt z metodami i właściwościami definiującymi zachowanie utworzonej instancji strumienia.
underlyingSource
Może to odbywać się za pomocą tych opcjonalnych metod zdefiniowanych przez deweloperów:
start(controller)
: wywoływana natychmiast po utworzeniu obiektu. Metoda może uzyskać dostęp do źródła strumienia i wykonywać inne czynności wymagane do skonfigurowania funkcji strumienia. Jeśli ten proces ma być wykonywany asynchronicznie, metoda może zwracać obietnicę, aby zasygnalizować sukces lub niepowodzenie. Parametrcontroller
przekazany do tej metody to aReadableStreamDefaultController
.pull(controller)
: można go używać do kontrolowania strumienia, gdy pobierane są kolejne fragmenty. Jest on wywoływany wielokrotnie, dopóki wewnętrzna kolejka fragmentów strumienia nie jest pełna, aż do osiągnięcia najwyższego poziomu. Jeśli wywołanie funkcjipull()
zwróci obietnicę, funkcjapull()
nie zostanie wywołana ponownie, dopóki nie zostanie spełniona obietnicą. Jeśli obietnica zostanie odrzucona, strumień będzie zawierał błąd.cancel(reason)
: jest wywoływany, gdy odbiorca strumienia anuluje strumień.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
obsługuje te metody:
ReadableStreamDefaultController.close()
zamyka powiązany strumień.ReadableStreamDefaultController.enqueue()
wstawia dany fragment do kolejki na powiązanym strumieniu.ReadableStreamDefaultController.error()
spowoduje, że wszystkie przyszłe interakcje z powiązanym strumieniem będą powodować błędy.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Drugi, również opcjonalny, argument konstruktora ReadableStream()
to queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania dla strumienia. Przyjmuje on 2 parametry:
highWaterMark
: nieujemna liczba wskazująca wysoki poziom ścieżki audio, która korzysta z tej strategii kolejkowania.size(chunk)
: funkcja, która oblicza i zwraca skończony nieujemny rozmiar danej wartości fragmentu. Wynik służy do określania ciśnienia zwrotnego, które jest widoczne w odpowiedniej właściwościReadableStreamDefaultController.desiredSize
. Określa ona też, kiedy wywoływana jest metodapull()
źródła danych.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Metody getReader()
i read()
Aby odczytać strumień do odczytu, potrzebujesz czytnika, którym będzie ReadableStreamDefaultReader
.
Metoda getReader()
interfejsu ReadableStream
tworzy czytnik i blokuje strumień. Podczas blokowania strumienia żaden inny czytnik nie może zostać pozyskany, dopóki ten nie zostanie zwolniony.
Metoda read()
interfejsu ReadableStreamDefaultReader
zwraca obietnicę zapewniającą dostęp do następnego fragmentu w wewnętrznej kolejce strumienia. W zależności od stanu strumienia może ona spełnić lub odrzucić dane. Możliwości są następujące:
- Jeśli fragment jest dostępny, obietnica zostanie spełniona za pomocą obiektu w formie
{ value: chunk, done: false }
. - Jeśli strumień zostanie zamknięty, obietnica zostanie spełniona za pomocą obiektu w formie
{ value: undefined, done: true }
. - Jeśli strumień będzie zawierał błąd, obietnica zostanie odrzucona z odpowiednim komunikatem o błędzie.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Właściwość locked
Aby sprawdzić, czy strumień do odczytu jest zablokowany, otwórz obiekt ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Czytelne przykłady kodu strumienia
Przykładowy kod poniżej pokazuje wszystkie te kroki w akcji. Najpierw tworzysz klasę ReadableStream
, która w swoim argumencie underlyingSource
(czyli klasie TimestampSource
) definiuje metodę start()
.
Ta metoda informuje controller
strumienia, abyenqueue()
dodawał sygnaturę czasową co sekundę przez 10 sekund.
Na koniec przekazuje kontrolerowi polecenie close()
strumienia. Odczytujesz ten strumień, tworząc czytnik za pomocą metody getReader()
i wywołując read()
, aż strumień będzie done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iteracja asynchroniczna
Sprawdzanie w każdej iteracji pętli read()
, czy strumień jest done
, może nie być najwygodniejszym interfejsem API.
Na szczęście wkrótce pojawi się lepszy sposób: iteracja asynchroniczna.
for await (const chunk of stream) {
console.log(chunk);
}
Aby obecnie korzystać z iteracji asynchronicznej, należy zaimplementować to zachowanie za pomocą polyfilla.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Odgałęzienie strumienia do odczytu
Metoda tee()
interfejsu ReadableStream
rozdziela bieżący odczytywalny strumień, zwracając tablicę o 2 elementach zawierającą 2 wynikowe gałęzie jako nowe instancje ReadableStream
. Dzięki temu 2 czytelnicy mogą jednocześnie czytać ten sam strumień. Możesz to zrobić na przykład w skrypcie service worker, jeśli chcesz pobrać odpowiedź z serwera i przesłać ją strumieniowo do przeglądarki, a także do pamięci podręcznej skryptu service worker. Ponieważ treści odpowiedzi nie można wykorzystać więcej niż raz, musisz utworzyć 2 kopie. Aby anulować strumień, musisz anulować oba powstałe rozgałęzienia. Przekierowanie strumienia zablokuje go na czas trwania, uniemożliwiając innym użytkownikom zablokowanie go.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
czytelne strumienie bajtów;
W przypadku strumieni reprezentujących bajty udostępniana jest rozszerzona wersja czytelnego strumienia, która umożliwia efektywne przetwarzanie bajtów, w szczególności poprzez minimalizowanie kopii. Strumienie bajtów umożliwiają nabywanie czytników typu „przynieś własną pamięć podręczną” (BYOB). Domyślna implementacja może dawać różne dane wyjściowe, np. ciągi znaków lub tablice buforów w przypadku WebSockets, podczas gdy strumienie bajtów gwarantują dane wyjściowe w postaci bajtów. Ponadto czytelnicy mogą korzystać z dodatkowych funkcji stabilności. Jeśli bufor się odłączy, może to spowodować, że nie zapiszesz dwukrotnie do tego samego bufora, co zapobiegnie sytuacjom wyścigu. Czytniki BYOB mogą zmniejszyć liczbę wywołań funkcji zbierania elementów usuniętych z pamięci przez przeglądarkę, ponieważ mogą ponownie używać buforów.
Tworzenie czytelnego strumienia bajtów
Możesz utworzyć czytelny strumień bajtów, przekazując dodatkowy parametr type
do konstruktora ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
Źródło danych strumienia bajtów do odczytu otrzymuje uprawnienia ReadableByteStreamController
do manipulowania. Metoda ReadableByteStreamController.enqueue()
przyjmuje argument chunk
, którego wartość jest typu ArrayBufferView
. Właściwość ReadableByteStreamController.byobRequest
zwraca bieżącą prośbę o przeniesienie do repozytorium BYOB lub wartość null, jeśli nie ma takiej prośby. Wreszcie właściwość ReadableByteStreamController.desiredSize
zwraca pożądany rozmiar, aby wypełnić kolejkę wewnętrzną kontrolowanego strumienia.
queuingStrategy
Drugi, również opcjonalny, argument konstruktora ReadableStream()
to queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania dla strumienia. Przyjmuje on jeden parametr:
highWaterMark
: nieujemna liczba bajtów wskazująca wysoki poziom strumienia korzystającego z tej strategii kolejkowania. Służy on do określania ciśnienia wstecznego, które jest widoczne w odpowiedniej właściwościReadableByteStreamController.desiredSize
. Określa ona też, kiedy wywoływana jest metodapull()
źródła danych.
Metody getReader()
i read()
Następnie możesz uzyskać dostęp do ReadableStreamBYOBReader
, odpowiednio ustawiając parametr mode
:ReadableStream.getReader({ mode: "byob" })
. Umożliwia to dokładniejszą kontrolę alokacji bufora, aby uniknąć kopiowania. Aby odczytać dane z strumienia bajtów, musisz wywołać funkcję ReadableStreamBYOBReader.read(view)
, gdzie view
to ArrayBufferView
.
Przykładowy kod strumienia bajtów w czytelnej postaci
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Podana niżej funkcja zwraca czytelne strumienie bajtów, które umożliwiają wydajne odczytywanie losowo wygenerowanej tablicy bez kopiowania. Zamiast używać z góry określonego rozmiaru fragmentu (1024), próbuje wypełnić bufor dostarczony przez dewelopera, co zapewnia pełną kontrolę.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Mechanika strumienia umożliwiającego zapis
Możliwość zapisu w strumieniach to miejsce docelowe, do którego możesz zapisywać dane. W języku JavaScript reprezentuje je obiekt WritableStream
. Jest to abstrakcja na szczycie podstawy odbiornika – odbiornika we/wy na niższym poziomie, do którego zapisywane są dane nieprzetworzone.
Dane są zapisywane w strumieniach za pomocą programu do zapisu, po jednym fragmencie na raz. Kawałek może przybierać wiele form, tak jak kawałki w czytniku. Do wygenerowania fragmentów gotowych do zapisania możesz użyć dowolnego kodu. Pisarz wraz z powiązanym kodem to producent.
Gdy nowy skrypt zostanie utworzony i zacznie zapisywać dane do strumienia (aktywny skrypt zapisuje dane), mówimy, że jest zablokowany na potrzeby tego strumienia. Do strumienia do zapisu może pisać tylko 1 program naraz. Jeśli chcesz, aby inny pisarz zaczął pisać do Twojego strumienia, musisz go najpierw uwolnić, a potem dołączyć do niego innego pisarza.
Kolejka wewnętrzna śledzi fragmenty zapisane w strumieniach, które nie zostały jeszcze przetworzone przez docelowy odbiornik.
Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować odwrotny nacisk na podstawie stanu swojej kolejki wewnętrznej. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce ze wskazaną liczbą, zwaną wysoką wartością graniczną.
Ostateczna konstrukcja nazywa się kontrolerem. Każdy strumień do zapisu ma powiązany kontroler, który umożliwia sterowanie tym strumieniem (np. jego przerwanie).
Tworzenie strumienia do zapisu
Interfejs WritableStream
interfejsu Streams API zapewnia standardową abstrakcję do zapisywania danych strumieniowych w miejscu docelowym, zwanym odbiornikiem. Ten obiekt ma wbudowane zabezpieczenie przed przeciążeniem i kolejkowanie. Strumień do zapisu tworzysz, wywołując jego konstruktor:
WritableStream()
.
Ma opcjonalny parametr underlyingSink
, który reprezentuje obiekt z metodami i właściwościami definiującymi zachowanie utworzonej instancji strumienia.
underlyingSink
Element underlyingSink
może zawierać te opcjonalne metody zdefiniowane przez dewelopera: Parametr controller
przekazywany do niektórych metod jest zmienną typu WritableStreamDefaultController
.
start(controller)
: ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Treść tej metody powinna umożliwiać uzyskanie dostępu do podstawowego odbiornika. Jeśli ten proces ma być wykonywany asynchronicznie, może zwrócić obietnicę, która sygnalizuje powodzenie lub niepowodzenie.write(chunk, controller)
: ta metoda zostanie wywołana, gdy nowy fragment danych (określony w parametrzechunk
) będzie gotowy do zapisania w podstawowym odbiorniku. Może zwrócić obietnicę sygnalizującą powodzenie lub niepowodzenie operacji zapisu. Ta metoda zostanie wywołana tylko po pomyślnym zapisaniu poprzednich danych, nigdy po zamknięciu lub przerwaniu strumienia.close(controller)
: ta metoda zostanie wywołana, jeśli aplikacja sygnalizuje, że zakończyła zapisywanie fragmentów do strumienia. Treści powinny wykonać wszystkie niezbędne czynności, aby dokończyć zapisywanie do podstawowego odbiornika i udostępnić do niego dostęp. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Ta metoda zostanie wywołana dopiero po pomyślnym zapisaniu wszystkich danych z kolejki.abort(reason)
: ta metoda zostanie wywołana, jeśli aplikacja sygnalizuje, że chce gwałtownie zamknąć strumień i ustawić go w stanie błędu. Może ona usunąć wszystkie zablokowane zasoby, podobnie jakclose()
, ale metodaabort()
zostanie wywołana nawet wtedy, gdy operacje zapisu są w kolejce. Te fragmenty zostaną odrzucone. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Parametrreason
zawiera wartośćDOMString
, która opisuje, dlaczego transmisja została przerwana.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Interfejs interfejsu Streams API WritableStreamDefaultController
reprezentuje kontroler, który umożliwia kontrolowanie stanu WritableStream
podczas konfigurowania, gdy przesyłane są kolejne fragmenty do zapisu lub po zakończeniu zapisu. Podczas tworzenia WritableStream
do podstawowego odbiornika przekazywany jest odpowiedni obiekt WritableStreamDefaultController
, którym można manipulować. WritableStreamDefaultController
ma tylko jedną metodę:
WritableStreamDefaultController.error()
,
co powoduje, że wszelkie przyszłe interakcje z powiązanym strumieniem powodują błąd.
Obiekt WritableStreamDefaultController
obsługuje też właściwość signal
, która zwraca instancję AbortSignal
, co umożliwia w razie potrzeby zatrzymanie operacji WritableStream
.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Drugi, również opcjonalny, argument konstruktora WritableStream()
to queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania dla strumienia. Przyjmuje on 2 parametry:
highWaterMark
: nieujemna liczba wskazująca wysoki poziom ścieżki audio, która korzysta z tej strategii kolejkowania.size(chunk)
: funkcja, która oblicza i zwraca skończony nieujemny rozmiar danej wartości fragmentu. Wynik służy do określania ciśnienia zwrotnego, które jest widoczne w odpowiedniej właściwościWritableStreamDefaultWriter.desiredSize
.
Metody getWriter()
i write()
Aby zapisywać dane w strumieniach do zapisu, potrzebujesz elementu zapisującego, którym jest WritableStreamDefaultWriter
. Metoda getWriter()
interfejsu WritableStream
zwraca nową instancję WritableStreamDefaultWriter
i blokuje strumień w tej instancji. Podczas blokowania strumienia nie można użyć innego narzędzia do pisania, dopóki nie zostanie zwolniony bieżący strumień.
Metoda write()
interfejsu WritableStreamDefaultWriter
zapisuje przekazany fragment danych w obiekcie WritableStream
i jego docelowym odbiorniku, a potem zwraca obietnicę, która wskazuje na powodzenie lub niepowodzenie operacji zapisu. Pamiętaj, że znaczenie terminu „sukces” zależy od podstawowego miejsca docelowego. Może on oznaczać, że fragment został zaakceptowany, ale niekoniecznie, że został bezpiecznie zapisany w miejscu docelowym.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Właściwość locked
Aby sprawdzić, czy strumień z możliwością zapisu jest zablokowany, otwórz jego właściwość WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Przykład kodu strumienia z możliwością zapisu
Przykładowy kod poniżej pokazuje wszystkie kroki.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Przesyłanie strumienia do odczytu do strumienia do zapisu
Strumień do odczytu można przekierować do strumienia do zapisu za pomocą metody pipeTo()
strumienia do odczytu.
ReadableStream.pipeTo()
przekazuje bieżącą wartość ReadableStream
do podanej wartości WritableStream
i zwraca obietnicę, która jest spełniana, gdy proces przekazywania zakończy się pomyślnie, lub odrzucana, jeśli wystąpiły jakieś błędy.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Tworzenie strumienia przekształcenia
Interfejs TransformStream
interfejsu Streams API reprezentuje zestaw danych, które można przekształcać. Strumień transformacji tworzysz, wywołując jego konstruktor TransformStream()
, który tworzy i zwraca obiekt strumienia transformacji na podstawie podanych modułów obsługi. Konstruktor TransformStream()
przyjmuje jako pierwszy argument opcjonalny obiekt JavaScript reprezentujący transformer
. Takie obiekty mogą zawierać dowolną z tych metod:
transformer
start(controller)
: ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zwykle służy to do umieszczania w kole fragmentów prefiksu za pomocącontroller.enqueue()
. Te fragmenty będą odczytywane z czytalnej strony, ale nie będą zależeć od zapisów na stronie z możliwością zapisu. Jeśli ten początkowy proces jest asynchroniczny, na przykład dlatego, że wymaga pewnych działań, aby pobrać fragmenty prefiksu, funkcja może zwrócić obietnicę, aby zasygnalizować sukces lub niepowodzenie. Odrzucona obietnica spowoduje błąd strumienia. KonstruktorTransformStream()
ponownie rzuca wszystkie wyjątki.transform(chunk, controller)
: ta metoda jest wywoływana, gdy nowy fragment zapisany pierwotnie po stronie zapisu jest gotowy do przekształcenia. Implementacja strumienia gwarantuje, że ta funkcja będzie wywoływana tylko po pomyślnym wykonaniu poprzednich przekształceń i nigdy przed zakończeniem działania funkcjistart()
ani po wywołaniu funkcjiflush()
. Ta funkcja wykonuje właściwą pracę związaną z przekształcaniem danych w strumieniach przekształcania. Może kolejkować wyniki za pomocą funkcjicontroller.enqueue()
. Dzięki temu pojedynczy fragment zapisany po stronie zapisu może spowodować, że po stronie odczytu nie będzie żadnych fragmentów lub będzie ich kilka, w zależności od tego, ile razy zostanie wywołana funkcjacontroller.enqueue()
. Jeśli proces przetwarzania jest asynchroniczny, ta funkcja może zwrócić obietnicę, która sygnalizuje powodzenie lub niepowodzenie przetwarzania. Odrzucona obietnica spowoduje błąd zarówno po stronie odczytu, jak i zapisu strumienia transformacji. Jeśli nie podasz metodytransform()
, zostanie użyta transformacja tożsamości, która umieszcza w kole elementy bez zmian z strony z możliwością zapisu na stronie z możliwością odczytu.flush(controller)
: ta metoda jest wywoływana, gdy wszystkie fragmenty zapisane po stronie z możliwością zapisu zostały przekształcone przez pomyślne przekazanie przeztransform()
, a strona z możliwością zapisu ma zostać zamknięta. Zwykle służy to do umieszczania w kolejce fragmentów sufiksów po stronie czytelniczej, zanim zostanie ona zamknięta. Jeśli proces czyszczenia jest asynchroniczny, funkcja może zwrócić obietnicę, aby zasygnalizować sukces lub niepowodzenie. Wynik zostanie przekazany do wywołującego funkcjistream.writable.write()
. Odrzucona obietnica spowoduje błąd zarówno po stronie odczytu, jak i zapisu strumienia. Wyjątek jest traktowany tak samo jak odrzucenie obietnicy.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Strategie kolejkowania writableStrategy
i readableStrategy
Drugi i trzeci opcjonalny parametr konstruktora TransformStream()
to odpowiednio writableStrategy
i readableStrategy
. Są one zdefiniowane odpowiednio w sekcji dotyczącej strumieni do odczytu i do zapisu.
Przykładowy kod przekształcania strumienia
Poniższy przykładowy kod pokazuje działanie prostego przekształcenia strumienia.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Przesyłanie strumienia czytelnego przez strumień transformacji
Metoda pipeThrough()
interfejsu ReadableStream
umożliwia łańcuchowe przesyłanie bieżącego strumienia przez strumień transformacji lub dowolną inną parę zapisu/odczytu. Przekierowanie strumienia zablokuje go na czas przekierowania, uniemożliwiając innym czytelnikom zablokowanie go.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Następujący przykładowy kod (trochę sztuczny) pokazuje, jak zaimplementować wersję fetch()
, która powoduje pisanie wielkimi literami całego tekstu, wykorzystując zwróconą obietnicę odpowiedzi jako strumień i zamieniając na wielkie litery poszczególne fragmenty. Zaletą tego podejścia jest to, że nie musisz czekać na pobranie całego dokumentu, co może mieć ogromne znaczenie w przypadku dużych plików.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Prezentacja
Demo poniżej pokazuje działanie strumieni do odczytu, do zapisu i przekształcania. Zawiera też przykłady pipeThrough()
i pipeTo()
łańcuchów do rur oraz demonstruje tee()
. Opcjonalnie możesz uruchomić demo w osobnym oknie lub wyświetlić kod źródłowy.
przydatne strumienie dostępne w przeglądarce;
W przeglądarce jest wiele przydatnych strumieni. Możesz łatwo utworzyć obiekt blob typu ReadableStream
. Metoda stream() interfejsu Blob
zwraca ReadableStream
, który po odczycie zwraca dane zawarte w blobie. Pamiętaj też, że obiekt File
jest określonym rodzajem obiektu Blob
i może być używany w dowolnym kontekście, w którym można użyć obiektu blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Wersje strumieniowe plików TextDecoder.decode()
i TextEncoder.encode()
nazywają się odpowiednio TextDecoderStream
i TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Kompresowanie i rozpakowywanie plików jest łatwe dzięki strumieniom transformacji CompressionStream
i DecompressionStream
. Przykładowy kod poniżej pokazuje, jak pobrać specyfikację strumieni, skompresować ją (gzip) bezpośrednio w przeglądarce i zapisać skompresowany plik bezpośrednio na dysku.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Interfejs File System Access API FileSystemWritableFileStream
oraz eksperymentalne strumienie żądań fetch()
to przykłady strumieni z możliwością zapisu w praktyce.
Interfejs Serial API intensywnie korzysta z obiektów strumieni do odczytu i zapisu.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Interfejs API WebSocketStream
integruje strumienie z interfejsem WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Przydatne materiały
- Specyfikacja strumieni
- Demo towarzyszące
- Strumienie polyfill
- 2016 r. – rok strumieni danych z sieci
- Asynchroniczne iteratory i generatory
- Wizualizacja transmisji
Podziękowania
Ten artykuł został sprawdzony przez Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley i Adam Rice. Posty na blogu Jake'a Archibalda pomogły mi dużo zrozumieć na temat strumieni. Niektóre przykłady kodu zostały zainspirowane eksperymentami użytkownika GitHuba @bellbind, a część tekstu opiera się na dokumentacji MDN na temat strumieni danych. Autorzy standardu Streams wykonali świetną pracę, tworząc te specyfikacje. Obraz główny udostępnił Ryan Lara na Unsplash.