Dowiedz się, jak używać strumieni czytelnych, zapisywalnych i przekształcanych za pomocą interfejsu Streams API.
Interfejs Streams API umożliwia programowy dostęp do strumieni danych otrzymanych przez sieć lub utworzonych lokalnie za pomocą dowolnych środków, a także ich przetwarzanie za pomocą kodu JavaScript. Strumieniowe przesyłanie polega na rozbiciu zasobu, który chcesz otrzymać, wysłać lub przekształcić, na małe fragmenty, a następnie przetwarzaniu tych fragmentów bit po bicie. Przesyłanie strumieniowe to coś, co przeglądarki robią zawsze, gdy otrzymują zasoby, takie jak HTML lub filmy, które mają być wyświetlane na stronach internetowych. Jednak ta funkcja nie była nigdy dostępna dla JavaScriptu przed wprowadzeniem w 2015 r. strumieni danych.fetch
Wcześniej, jeśli chciałeś przetworzyć jakiś zasób (np. plik wideo lub tekstowy), musiałeś pobrać cały plik, poczekać, aż zostanie on zdeserializowany do odpowiedniego formatu, a następnie przetworzyć go. Dzięki strumieniom dostępnym dla JavaScript wszystko się zmienia. Teraz możesz przetwarzać dane nieprzetworzone za pomocą JavaScriptu stopniowo, gdy tylko będą dostępne na kliencie, bez konieczności generowania bufora, ciągu znaków ani bloba. Umożliwia to wiele zastosowań, w tym te wymienione poniżej:
- Efekty wideo: przesyłanie czytelnego strumienia wideo przez strumień transformacji, który stosuje efekty w czasie rzeczywistym.
- Kompresja i dekompresja danych: przekierowanie strumienia plików przez strumień transformacji, który selektywnie je kompresuje lub dekompresuje.
- Dekodowanie obrazu: przekierowanie strumienia odpowiedzi HTTP przez strumień transformacji, który dekoduje bajty na dane bitmapy, a następnie przez inny strumień transformacji, który przekształca bitmapy w pliki PNG. Jeśli jest zainstalowany w obiekcie
fetch
w ramach usługi, umożliwia to przejrzyste wypełnianie nowych formatów obrazów, takich jak AVIF.
Obsługa przeglądarek
ReadableStream i WritableStream
TransformStream
Podstawowe pojęcia
Zanim omówię szczegółowo różne typy strumieni, przedstawię kilka podstawowych pojęć.
kawałki
Kawałek to pojedynczy element danych zapisywany w strumieniu lub odczytywany z niego. Może to być dowolny typ; strumienie mogą zawierać fragmenty o różnych typach. W większości przypadków nie będzie to najbardziej elementarna jednostka danych w danym strumieniu. Na przykład strumień bajtów może zawierać fragmenty o wielkości 16 KiBUint8Array
zamiast pojedynczych bajtów.
Czytelne strumienie
Czytalny strumień to źródło danych, z którego możesz odczytywać dane. Innymi słowy, dane wychodzą z czytelnego strumienia. Strumień tekstowy to konkretnie instancja klasy ReadableStream
.
Strumienie z możliwością zapisu
Strumień z możliwością zapisu reprezentuje miejsce docelowe danych, do których możesz zapisywać dane. Inaczej mówiąc, dane przechodzą do strumienia do zapisu. Strumień do zapisu to instancja klasy WritableStream
.
Przekształcanie strumieni
Strumień transformacji składa się z 2 strumieni: strumienia do zapisu (czyli strony zapisu) oraz strumienia do odczytu (czyli strony odczytu).
W realnym świecie metaforą tego jest tłumacz symultaniczny, który na bieżąco tłumaczy z jednego języka na inny.
W sposób specyficzny dla strumienia transformacji zapisywanie na stronie z możliwością zapisu powoduje, że nowe dane są dostępne do odczytu po stronie z możliwością odczytu. Konkretnie, strumień transformacji może być reprezentowany przez dowolny obiekt z właściwością writable
i właściwością readable
. Standardowa klasa TransformStream
ułatwia jednak tworzenie par, które są prawidłowo splecione.
Łańcuchy do rur
Strumienie są głównie używane do przesyłania ich do siebie nawzajem. Strumień do odczytu można przekierować bezpośrednio do strumienia do zapisu, używając metody pipeTo()
strumienia do odczytu. Można go też przekierować przez jeden lub więcej strumieni transformacji, używając metody pipeThrough()
strumienia do odczytu. Zbiór połączonych strumieni nazywamy łańcuchem.
Ciśnienie wsteczne
Po utworzeniu łańcucha rur będzie on rozpowszechniać sygnały dotyczące tego, jak szybko elementy powinny przez niego przepływać. Jeśli którykolwiek z etapów w łańcuchu nie może jeszcze przyjąć fragmentów, rozprzestrzenia on sygnał w przeciwnym kierunku przez cały łańcuch, aż do momentu, gdy pierwotne źródło otrzyma informację o zaprzestaniu tak szybkiego generowania fragmentów. Ten proces normalizacji przepływu nazywa się odwrotnym ciśnieniem.
Teeing
Przepływ danych do odczytu może być rozdzielany (nazwa pochodzi od kształtu litery „T” w wielkiej literze) za pomocą metody tee()
.
Spowoduje to zablokowanie strumienia, co oznacza, że nie będzie można go używać bezpośrednio. Spowoduje to jednak utworzenie 2 nowych strumieni, zwanych gałęziami, które można odtwarzać niezależnie.
Jest to ważne, ponieważ strumieni nie można cofnąć ani ponownie uruchomić. Więcej informacji na ten temat znajdziesz poniżej.
Mechanika czytelnego strumienia
Czytelny strumień to źródło danych reprezentowane w JavaScriptie przez obiekt ReadableStream
, który pochodzi z podstawowego źródła. Konstruktor ReadableStream()
tworzy i zwraca obiekt strumienia do odczytu na podstawie podanych modułów obsługi. Istnieją 2 rodzaje źródeł danych:
- Źródła push stale przesyłają dane po uzyskaniu dostępu. To od Ciebie zależy, czy chcesz rozpocząć, wstrzymać lub anulować dostęp do strumienia. Przykłady to strumienie wideo na żywo, zdarzenia wysyłane przez serwer lub WebSockets.
- Źródła typu „pull” wymagają, aby po nawiązaniu połączenia z nimi wysłać do nich wyraźne żądanie danych. Przykłady obejmują operacje HTTP za pomocą wywołań
fetch()
lubXMLHttpRequest
.
Dane strumienia są odczytywane sekwencyjnie w mniejszych fragmentach zwanych kawałkami. Fragmenty umieszczone w strumieniu są dodawane do kolejki. Oznacza to, że są one w kolejce i czekają na odczytanie. Kolejka wewnętrzna śledzi fragmenty, które nie zostały jeszcze przeczytane.
Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować odwrotny nacisk na podstawie stanu kolejki wewnętrznej. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce ze wskazaną liczbą, zwaną wysoką wartością graniczną.
Fragmenty w strumieniu są odczytywane przez czytelnika. Ten czytnik pobiera dane po kawałku, co pozwala na wykonywanie na nich dowolnych operacji. Czytnik wraz z innym kodem przetwarzania, który jest z nim powiązany, jest nazywany konsumentem.
Kolejna konstrukcja w tym kontekście nazywa się kontroler. Każdy czytelny strumień ma powiązany kontroler, który, jak sama nazwa wskazuje, umożliwia sterowanie strumieniem.
Tylko jeden czytnik może odczytywać strumień naraz. Gdy czytnik zostanie utworzony i zacznie odczytywać strumień (czyli stanie się aktywnym czytnikiem), zostaje zablokowany na nim. Jeśli chcesz, aby inna osoba przejęła czytanie strumienia, musisz najpierw zwolnić pierwszego czytelnika (chociaż możesz też rozgałęzić strumień).
Tworzenie czytelnego strumienia
Aby utworzyć strumień do odczytu, wywołaj jego konstruktor:
ReadableStream()
.
Konstruktor ma opcjonalny argument underlyingSource
, który reprezentuje obiekt z metodami i właściwościami definiującymi zachowanie utworzonej instancji strumienia.
underlyingSource
Może to odbywać się za pomocą tych opcjonalnych metod zdefiniowanych przez dewelopera:
start(controller)
: jest wywoływana natychmiast po utworzeniu obiektu. Metoda może uzyskiwać dostęp do źródła strumienia i wykonywać inne czynności wymagane do skonfigurowania funkcji strumienia. Jeśli ten proces ma być wykonywany asynchronicznie, metoda może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie. Parametrcontroller
przekazany do tej metody to aReadableStreamDefaultController
.pull(controller)
: można go użyć do sterowania strumieniem, gdy pobierane są kolejne fragmenty. Jest ona wywoływana wielokrotnie, dopóki wewnętrzna kolejka fragmentów strumienia nie jest pełna, aż do osiągnięcia maksymalnego poziomu. Jeśli wywołanie funkcjipull()
spowoduje obietnicę, funkcjapull()
nie zostanie wywołana ponownie, dopóki nie zostanie spełniona obietnica. Jeśli obietnica zostanie odrzucona, strumień będzie zawierał błąd.cancel(reason)
: jest wywoływany, gdy użytkownik strumienia anuluje strumień.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
obsługuje te metody:
ReadableStreamDefaultController.close()
zamyka powiązany strumień.ReadableStreamDefaultController.enqueue()
wstawia dany fragment do kolejki w powiązanym strumieniu.ReadableStreamDefaultController.error()
spowoduje, że wszystkie przyszłe interakcje z powiązanym strumieniem będą powodować błędy.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Drugi, również opcjonalny, argument konstruktora ReadableStream()
to queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje kolejkowanie dla strumienia. Przyjmuje 2 parametry:
highWaterMark
: nieujemna liczba wskazująca wysoki poziom okna transmisji używającej tej strategii kolejkowania.size(chunk)
: funkcja, która oblicza i zwraca skończony nieujemny rozmiar danej wartości fragmentu. Wynik służy do określenia ciśnienia zwrotnego, które jest widoczne w odpowiedniej właściwościReadableStreamDefaultController.desiredSize
. Określa ona też, kiedy wywoływana jest metodapull()
źródła danych.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
metody getReader()
i read()
;
Aby odczytać strumień do odczytu, potrzebujesz czytnika, którym będzie ReadableStreamDefaultReader
.
Metoda getReader()
interfejsu ReadableStream
tworzy czytnik i blokuje strumień. Podczas blokowania strumienia nie można uzyskać dostępu do żadnego innego czytnika, dopóki ten nie zostanie zwolniony.
Metoda read()
interfejsu ReadableStreamDefaultReader
zwraca obietnicę zapewniającą dostęp do następnego fragmentu w wewnętrznej kolejce strumienia. W zależności od stanu strumienia może ona spełnić lub odrzucić dane. Możliwości są następujące:
- Jeśli fragment jest dostępny, obietnica zostanie spełniona za pomocą obiektu w formie
{ value: chunk, done: false }
. - Jeśli strumień zostanie zamknięty, obietnica zostanie spełniona za pomocą obiektu w formie
{ value: undefined, done: true }
. - Jeśli strumień będzie zawierał błąd, obietnica zostanie odrzucona z odpowiednim komunikatem o błędzie.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Właściwość locked
Aby sprawdzić, czy strumień do odczytu jest zablokowany, otwórz obiekt ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
czytelne przykłady kodu strumienia danych.
Przykładowy kod poniżej pokazuje wszystkie te kroki w akcji. Najpierw tworzysz klasę ReadableStream
, która w swoim argumencie underlyingSource
(czyli klasie TimestampSource
) definiuje metodę start()
.
Ta metoda informuje controller
strumienia, abyenqueue()
dodawał sygnaturę czasową co sekundę przez 10 sekund.
Na koniec przekazuje kontrolerowi polecenie close()
strumienia. Odczytujesz ten strumień, tworząc czytnik za pomocą metody getReader()
i wywołując read()
, aż strumień będzie done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iteracja asynchroniczna
Sprawdzanie w każdej iteracji pętli read()
, czy strumień jest done
, może nie być najwygodniejszym interfejsem API.
Na szczęście wkrótce pojawi się lepszy sposób: iteracja asynchroniczna.
for await (const chunk of stream) {
console.log(chunk);
}
Aby obecnie korzystać z iteracji asynchronicznej, możesz zaimplementować to zachowanie za pomocą polyfilla.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Odczytywanie strumienia danych
Metoda tee()
interfejsu ReadableStream
rozdziela bieżący odczytywalny strumień, zwracając tablicę o 2 elementach zawierającą 2 wynikowe gałęzie jako nowe instancje ReadableStream
. Dzięki temu 2 czytelnicy mogą jednocześnie czytać strumień. Możesz to zrobić na przykład w skrypcie service worker, jeśli chcesz pobrać odpowiedź z serwera i przesłać ją do przeglądarki, a także do pamięci podręcznej skryptu service worker. Ponieważ treści odpowiedzi nie można wykorzystać więcej niż raz, musisz utworzyć 2 kopie. Aby anulować strumień, musisz anulować oba powstałe rozgałęzienia. Przekierowanie strumienia zablokuje go na czas trwania, uniemożliwiając innym użytkownikom zablokowanie go.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
czytelne strumienie bajtów;
W przypadku strumieni reprezentujących bajty udostępniana jest rozszerzona wersja czytelnego strumienia, która umożliwia wydajne przetwarzanie bajtów, w szczególności poprzez minimalizowanie kopii. Strumienie bajtów umożliwiają nabywanie czytników typu „przynieś własną pamięć podręczną” (BYOB). Domyślna implementacja może dawać różne dane wyjściowe, takie jak ciągi znaków lub tablice buforów w przypadku WebSockets, podczas gdy strumienie bajtów gwarantują dane wyjściowe w postaci bajtów. Ponadto czytelnicy mogą korzystać z dodatkowych funkcji stabilności. Jeśli bufor się odłączy, może to spowodować, że nie zapiszesz dwa razy do tego samego bufora, co zapobiega sytuacjom wyścigu. Czytniki BYOB mogą zmniejszyć liczbę razy, ile przeglądarka musi wykonać zbiórka, ponieważ może ponownie używać buforów.
Tworzenie czytelnego strumienia bajtów
Aby utworzyć czytelny strumień bajtów, możesz przekazać dodatkowy parametr type
do konstruktora ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
Źródło strumienia bajtów do odczytu ma uprawnienia ReadableByteStreamController
do manipulowania. Metoda ReadableByteStreamController.enqueue()
przyjmuje argument chunk
, którego wartość jest typu ArrayBufferView
. Właściwość ReadableByteStreamController.byobRequest
zwraca bieżącą prośbę o przechwycenie z repozytorium BYOB lub wartość null, jeśli nie ma takiej prośby. Wreszcie właściwość ReadableByteStreamController.desiredSize
zwraca pożądany rozmiar, aby wypełnić kolejkę wewnętrzną kontrolowanego strumienia.
queuingStrategy
Drugi, również opcjonalny, argument konstruktora ReadableStream()
to queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia. Przyjmuje on jeden parametr:
highWaterMark
: nieujemna liczba bajtów wskazująca wysoki poziom strumienia przy użyciu tej strategii kolejkowania. Służy to określeniu ciśnienia wstecznego, które jest widoczne w odpowiedniej właściwościReadableByteStreamController.desiredSize
. Określa ona też, kiedy wywoływana jest metodapull()
źródła danych.
metody getReader()
i read()
;
Następnie możesz uzyskać dostęp do ReadableStreamBYOBReader
, odpowiednio ustawiając parametr mode
:
ReadableStream.getReader({ mode: "byob" })
. Umożliwia to dokładniejsze kontrolowanie przydziału bufora, aby uniknąć kopiowania. Aby odczytać dane z potoku bajtów, musisz wywołać funkcję ReadableStreamBYOBReader.read(view)
, gdzie view
to ArrayBufferView
.
Przykładowy kod strumienia bajtów w czytelnej postaci
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Podana niżej funkcja zwraca czytelne strumienie bajtów, które umożliwiają wydajne odczytywanie tablicy wygenerowanej losowo bez kopiowania. Zamiast używać z góry określonego rozmiaru fragmentu (1024), próbuje wypełnić bufor dostarczony przez dewelopera, co zapewnia pełną kontrolę.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Mechanika strumienia z możliwością zapisu
Możliwość zapisu w strumieniu to miejsce docelowe, do którego możesz zapisywać dane reprezentowane w JavaScriptzie przez obiekt WritableStream
. Jest to abstrakcja na szczycie podstawy odbiornika, czyli odbiornika we/wy na niższym poziomie, do którego zapisywane są dane nieprzetworzone.
Dane są zapisywane w strumieniach za pomocą programu do zapisywania, po jednym kawałku naraz. Kawałek może przybierać wiele form, tak jak kawałki w czytniku. Do wygenerowania fragmentów gotowych do zapisania możesz użyć dowolnego kodu. Pisarz wraz z powiązanym kodem to producent.
Gdy nowy pisarz zaczyna pisać do strumienia (aktywny pisarz), mówi się, że jest zablokowany. Do strumienia do zapisu może pisać tylko 1 program. Jeśli chcesz, aby inny pisarz zaczął pisać do Twojego strumienia, musisz go najpierw uwolnić, a potem dołączyć do niego innego pisarza.
Kolejka wewnętrzna śledzi fragmenty zapisane w strumieniach, które nie zostały jeszcze przetworzone przez docelowy odbiornik.
Strategia kolejkowania to obiekt, który określa, jak strumień powinien sygnalizować odwrotny nacisk na podstawie stanu kolejki wewnętrznej. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce ze wskazaną liczbą, zwaną wysoką wartością graniczną.
Ostatnia konstrukcja jest nazywana kontrolerem. Każdy strumień z możliwością zapisu ma powiązany kontroler, który umożliwia sterowanie tym strumieniem (np. jego przerwanie).
Tworzenie strumienia do zapisu
Interfejs WritableStream
interfejsu Streams API zapewnia standardową abstrakcję do zapisywania danych strumieniowych w miejscu docelowym, zwanym odbiornikiem. Ten obiekt ma wbudowane zabezpieczenie przed przeciążeniem i kolejkowanie. Strumień do zapisu tworzysz, wywołując jego konstruktor:
WritableStream()
.
Ma on opcjonalny parametr underlyingSink
, który reprezentuje obiekt z metodami i właściwościami definiującymi sposób działania utworzonej instancji strumienia.
underlyingSink
underlyingSink
może zawierać te opcjonalne metody zdefiniowane przez dewelopera. Parametr controller
przekazywany do niektórych metod jest zmienną typu WritableStreamDefaultController
.
start(controller)
: ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Treść tej metody powinna umożliwiać uzyskanie dostępu do podstawowego odbiornika. Jeśli ten proces ma być wykonywany asynchronicznie, może zwrócić obietnicę, aby zasygnalizować powodzenie lub niepowodzenie.write(chunk, controller)
: ta metoda zostanie wywołana, gdy nowy fragment danych (określony w parametrzechunk
) będzie gotowy do zapisania w podstawowym odbiorniku. Może zwracać obietnicę, aby zasygnalizować powodzenie lub niepowodzenie operacji zapisu. Ta metoda zostanie wywołana tylko po pomyślnym zapisaniu poprzednich danych, nigdy po zamknięciu lub przerwaniu strumienia.close(controller)
: ta metoda zostanie wywołana, jeśli aplikacja sygnalizuje, że zakończyła zapisywanie fragmentów do strumienia. Treści powinny zrobić wszystko, co jest konieczne, aby dokończyć zapisywanie do podstawowego odbiornika i udostępnić do niego dostęp. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, która sygnalizuje powodzenie lub niepowodzenie. Ta metoda zostanie wywołana dopiero po pomyślnym zapisaniu wszystkich danych z kolejki.abort(reason)
: ta metoda zostanie wywołana, jeśli aplikacja sygnalizuje, że chce gwałtownie zamknąć strumień i ustawić go w stanie błędu. Może ona wyczyścić wszystkie zablokowane zasoby, podobnie jakclose()
, ale metodaabort()
zostanie wywołana nawet wtedy, gdy operacje zapisu są w kolejce. Te fragmenty zostaną odrzucone. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę, która sygnalizuje powodzenie lub niepowodzenie. Parametrreason
zawiera wartośćDOMString
, która opisuje, dlaczego transmisja została przerwana.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Interfejs interfejsu Streams API WritableStreamDefaultController
reprezentuje kontroler, który umożliwia kontrolowanie stanu WritableStream
podczas konfigurowania, gdy przesyłane są kolejne fragmenty do zapisu lub po zakończeniu zapisu. Podczas tworzenia WritableStream
do podstawowego odbiornika przekazywany jest odpowiedni obiekt WritableStreamDefaultController
, którym można manipulować. WritableStreamDefaultController
ma tylko jedną metodę:
WritableStreamDefaultController.error()
,
co powoduje, że wszelkie przyszłe interakcje z powiązanym strumieniem powodują błąd.
Obiekt WritableStreamDefaultController
obsługuje też właściwość signal
, która zwraca instancję AbortSignal
, co umożliwia zatrzymanie operacji WritableStream
w razie potrzeby.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Drugi, również opcjonalny, argument konstruktora WritableStream()
to queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia. Przyjmuje ona 2 parametry:
highWaterMark
: nieujemna liczba wskazująca wysoki poziom okna transmisji używającej tej strategii kolejkowania.size(chunk)
: funkcja, która oblicza i zwraca skończony nieujemny rozmiar danej wartości fragmentu. Wynik służy do określenia ciśnienia zwrotnego, które jest widoczne w odpowiedniej właściwościWritableStreamDefaultWriter.desiredSize
.
metody getWriter()
i write()
;
Aby zapisywać dane w strumieniach do zapisu, musisz mieć element zapisu, którym jest WritableStreamDefaultWriter
. Metoda getWriter()
interfejsu WritableStream
zwraca nową instancję WritableStreamDefaultWriter
i blokuje strumień w tej instancji. Podczas blokowania strumienia nie można dodać żadnego innego pisarza, dopóki nie zostanie zwolniony bieżący.
Metoda write()
interfejsu WritableStreamDefaultWriter
zapisuje przekazany fragment danych w obiekcie WritableStream
i jego docelowym odbiorniku, a następnie zwraca obietnicę, która wskazuje na powodzenie lub niepowodzenie operacji zapisu. Pamiętaj, że znaczenie terminu „sukces” zależy od docelowego miejsca docelowego. Może on oznaczać, że fragment został zaakceptowany, ale niekoniecznie, że został bezpiecznie zapisany w docelowym miejscu.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Właściwość locked
Aby sprawdzić, czy strumień z możliwością zapisu jest zablokowany, otwórz jego właściwość WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Przykład kodu strumienia z możliwością zapisu
Przykładowy kod poniżej pokazuje wszystkie kroki.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Przesyłanie strumienia do odczytu do strumienia do zapisu
Strumień do odczytu można przekierować do strumienia do zapisu za pomocą metody pipeTo()
strumienia do odczytu.
ReadableStream.pipeTo()
przekazuje bieżącą wartość ReadableStream
do podanej wartości WritableStream
i zwraca obietnicę, która jest wypełniana po pomyślnym zakończeniu procesu przekazywania lub odrzucana, jeśli wystąpiły błędy.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Tworzenie strumienia przekształcenia
Interfejs TransformStream
interfejsu Streams API reprezentuje zestaw danych, które można przekształcać. Strumień transformacji tworzysz, wywołując jego konstruktor TransformStream()
, który tworzy i zwraca obiekt strumienia transformacji na podstawie podanych modułów obsługi. Konstruktor TransformStream()
przyjmuje jako pierwszy argument opcjonalny obiekt JavaScript reprezentujący transformer
. Takie obiekty mogą zawierać dowolną z tych metod:
transformer
start(controller)
: ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zwykle służy to do umieszczania w kole fragmentów prefiksu za pomocą parametrucontroller.enqueue()
. Te fragmenty będą odczytywane z czytalnej strony, ale nie będą zależeć od zapisów na stronie z możliwością zapisu. Jeśli ten początkowy proces jest asynchroniczny, na przykład dlatego, że wymaga pewnych działań, aby pobrać fragmenty prefiksu, funkcja może zwrócić obietnicę, aby zasygnalizować sukces lub niepowodzenie. Odrzucona obietnica spowoduje błąd strumienia. KonstruktorTransformStream()
ponownie rzuca wszystkie wyjątki.transform(chunk, controller)
: ta metoda jest wywoływana, gdy nowy fragment zapisany pierwotnie po stronie zapisu jest gotowy do przekształcenia. Implementacja strumienia gwarantuje, że ta funkcja będzie wywoływana tylko po pomyślnym wykonaniu poprzednich przekształceń i nigdy przed zakończeniem działania funkcjistart()
ani po wywołaniu funkcjiflush()
. Ta funkcja wykonuje właściwą pracę związaną z przekształcaniem danych w strumieniach przekształcania. Może kolejkować wyniki za pomocą funkcjicontroller.enqueue()
. Dzięki temu pojedynczy fragment zapisany po stronie zapisu może spowodować, że po stronie odczytu nie będzie żadnych fragmentów lub będzie ich kilka, w zależności od tego, ile razy wywołano funkcjęcontroller.enqueue()
. Jeśli proces przetwarzania jest asynchroniczny, ta funkcja może zwrócić obietnicę, która sygnalizuje powodzenie lub niepowodzenie przetwarzania. Odrzucona obietnica spowoduje błąd zarówno po stronie odczytu, jak i zapisu strumienia transformacji. Jeśli nie podasz metodytransform()
, zostanie użyta transformacja tożsamości, która umieszcza w kolejce niezmienione fragmenty z zapisywalnej strony na stronę czytelną.flush(controller)
: ta metoda jest wywoływana, gdy wszystkie fragmenty zapisane po stronie z możliwością zapisu zostały przekształcone przez udane przekazanie przeztransform()
, a strona z możliwością zapisu ma zostać zamknięta. Zwykle służy to do umieszczania w kolejce fragmentów sufiksów po stronie czytelniczej, zanim zostanie ona zamknięta. Jeśli proces czyszczenia jest asynchroniczny, funkcja może zwrócić obietnicę, aby zasygnalizować sukces lub niepowodzenie. Wynik zostanie przekazany do wywołującego funkcjistream.writable.write()
. Odrzucona obietnica spowoduje błąd zarówno po stronie odczytu, jak i zapisu strumienia. Wyjątek jest traktowany tak samo jak odrzucenie obietnicy.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Strategie kolejkowania writableStrategy
i readableStrategy
Drugi i trzeci opcjonalny parametr konstruktora TransformStream()
to odpowiednio strategie kolejkowania writableStrategy
i readableStrategy
. Są one zdefiniowane odpowiednio w sekcji dotyczącej strumieni do odczytu i do zapisu.
Przykładowy kod przekształcania strumienia
Poniższy przykładowy kod pokazuje działanie prostego przekształcenia strumienia.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Przesyłanie strumienia do odczytu przez strumień do transformacji
Metoda pipeThrough()
interfejsu ReadableStream
umożliwia łańcuchowe przesyłanie bieżącego strumienia przez strumień przekształcenia lub dowolną inną parę zapisu/odczytu. Przekierowanie strumienia zablokuje go na czas trwania przekierowania, uniemożliwiając innym czytelnikom jego zablokowanie.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Następujący przykładowy kod (trochę sztuczny) pokazuje, jak zastosować wersję fetch()
, która powoduje pisanie wszystkich znaków wielkimi literami, wykorzystując zwróconą obietnicę odpowiedzi jako strumień i przekształcając poszczególne fragmenty w duże litery. Zaletą tego podejścia jest to, że nie musisz czekać na pobranie całego dokumentu, co może mieć ogromne znaczenie w przypadku dużych plików.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Prezentacja
Demonstracja poniżej pokazuje działanie strumieni czytalnych, zapisywalnych i przekształcających. Zawiera też przykłady pipeThrough()
i pipeTo()
łańcuchów przewodów oraz demonstruje tee()
. Opcjonalnie możesz uruchomić demo w osobnym oknie lub wyświetlić kod źródłowy.
Przydatne strumienie dostępne w przeglądarce
W przeglądarce jest wiele przydatnych strumieni danych. Możesz łatwo utworzyć ReadableStream
z bloba. Metoda stream() interfejsu Blob
zwraca ReadableStream
, który po odczycie zwraca dane zawarte w pliku blob. Pamiętaj też, że obiekt File
to specyficzny rodzaj obiektu Blob
, który można używać w dowolnym kontekście, w którym można używać obiektów blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Wersje strumieniowe TextDecoder.decode()
i TextEncoder.encode()
nazywają się odpowiednio TextDecoderStream
i TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Kompresowanie i rozpakowywanie plików jest proste dzięki strumieniom transformacji CompressionStream
i DecompressionStream
. Przykładowy kod poniżej pokazuje, jak pobrać specyfikację strumieni, skompresować ją (gzip) bezpośrednio w przeglądarce i zapisać skompresowany plik bezpośrednio na dysku.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Interfejs File System Access API FileSystemWritableFileStream
i eksperymentalne fetch()
strumień zapytań to przykłady strumieni z możliwością zapisu w praktyce.
Interfejs Serial API intensywnie korzysta zarówno ze strumieni do odczytu, jak i do zapisu.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
I wreszcie interfejs API WebSocketStream
integruje strumienie z interfejsem WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Przydatne materiały
- Specyfikacja strumieni
- Demo towarzyszące
- Stream polyfill
- 2016 r. – rok strumieni danych z sieci
- Asynchroniczne iteratory i generatory
- Wizualizacja transmisji
Podziękowania
Ten artykuł został sprawdzony przez Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley i Adam Rice. Posty na blogu Jake'a Archibalda bardzo mi pomogły w zrozumieniu strumieni danych. Niektóre przykłady kodu zostały zainspirowane przez użytkownika GitHuba @bellbind, a niektóre fragmenty tekstu w dużym stopniu opierają się na dokumentacji MDN na temat strumieni danych. Autorzy standardu Streams Standard wykonali świetną pracę, tworząc te specyfikacje. Obraz główny udostępnił Ryan Lara na Unsplash.