Dowiedz się, jak za pomocą interfejsu Streams API używać czytelnych i możliwych do zapisu strumieni oraz strumieni przekształcania.
Interfejs Streams API umożliwia automatyczny dostęp do strumieni danych otrzymanych przez sieć
lub tworzone w inny sposób lokalnie,
i przetwarzać je za pomocą JavaScriptu. Strumieniowanie polega na rozbiciu zasobu, który chcesz odebrać, wysłać lub przekształcić
na małe kawałki, a następnie przetwarzamy je krok po kroku. Strumieniowe przesyłanie danych
przeglądarki i tak otrzymują zasoby, takie jak HTML czy filmy, które mają być wyświetlane na stronach internetowych,
Usługa fetch
nie była dostępna w języku JavaScript przed wprowadzeniem strumieni w 2015 roku.
Wcześniej, aby przetworzyć określony zasób (np. film lub plik tekstowy), musisz pobrać cały plik, poczekać na deserializację do odpowiedniego formatu a potem przetwarzać. Transmisje są dostępne dla: JavaScript, to wszystko się zmienia. Możesz teraz przetwarzać nieprzetworzone dane za pomocą JavaScriptu stopniowo w miarę gdy tylko będzie dostępny u klienta, bez konieczności generowania bufora, ciągu znaków czy obiektu blob. Odblokowuje to szereg przypadków użycia. Niektóre z nich wymieniam poniżej:
- Efekty wideo: przytaczanie czytelnego strumienia wideo przez strumień przekształcenia, który stosuje efekty. w czasie rzeczywistym.
- (de)kompresja danych: przytaczanie strumienia plików przez strumień przekształcenia, który wybiórczo (de) kompresuje go.
- Dekodowanie obrazów: potok odpowiedzi HTTP przez strumień przekształcenia, który dekoduje bajty.
do formatu bitmapy, a potem przez inny strumień przekształcenia, który przekształca mapy bitowe na pliki PNG. Jeśli
zainstalowane w module obsługi
fetch
skryptu service worker, umożliwia to przejrzystość kodu polyfill takich jak AVIF.
Obsługa przeglądarek
ReadableStream i WritableStream
TransformStream
Podstawowe pojęcia
Zanim przejdziemy do szczegółowych informacji o różnych typach transmisji, przedstawię kilka podstawowych pojęć.
Kawałki
Fragment to pojedynczy fragment danych zapisywany w strumieniu lub z niego odczytywany. Może to być dowolny
type; strumienie mogą zawierać nawet fragmenty różnych typów. Przeważnie fragment nie będzie najbardziej atomowy,
dla danego strumienia. Na przykład strumień bajtów może zawierać fragmenty składające się z 16
KiB jednostek Uint8Array
zamiast pojedynczych bajtów.
Strumienie do odczytu
Czytelny strumień reprezentuje źródło danych, z którego można odczytywać dane. Inaczej mówiąc, dane mogą
w czytelny strumień. Czytelny strumień to instancja instancji ReadableStream
zajęcia.
Strumienie z możliwością zapisu
Strumień z możliwością zapisu reprezentuje miejsce docelowe danych, w których możesz je zapisywać. Inaczej mówiąc, dane
przekazuje się do strumienia możliwego do zapisu. Strumień możliwy do zapisu to przykład
WritableStream
zajęcia.
Strumienie przekształcania
Strumień przekształcenia składa się z pary strumieni: strumienia, który można zapisywać, nazywanego stroną z możliwością zapisu.
i czytelna strona.
Prawdziwą metaforą byłaby
tłumacz symultaniczny
który na bieżąco tłumaczy z jednego języka na inny.
W sposób charakterystyczny dla strumienia przekształcania,
powoduje udostępnienie nowych danych do odczytu
czytelna strona. Oznacza to, że każdy obiekt z właściwością writable
i readable
może wyświetlać reklamy
jako strumień przekształcenia. Standardowa klasa TransformStream
ułatwia jednak tworzenie
która jest odpowiednio zaplątana.
Łańcuchy potoków
Strumienie są używane przede wszystkim przez połączenie ich ze sobą. Czytelny strumień można dodać bezpośrednio do potoku
do strumienia z możliwością zapisu, używając metody pipeTo()
zrozumiałego strumienia. Można też przekazać go przy użyciu
lub więcej strumieni przekształcenia, korzystając z metody pipeThrough()
zrozumiałego strumienia. Zestaw
strumienie połączone potokiem w ten sposób nazywamy łańcuchem rur.
Ciśnienie wsteczne
Po utworzeniu łańcucha potoków zostaną przesłane sygnały dotyczące szybkości przepływu fragmentów i przechodzić przez niego. Jeśli dowolny krok w łańcuchu nie może jeszcze przyjmować fragmentów, przesyła sygnał wstecz w łańcuchu rur, aż w końcu ukaże się, że pierwotne źródło nie generuje już fragmentów, szybko. Ten proces normalizowania przepływu jest nazywany obciążeniem wstecznym.
Rozgrywka
Czytelny strumień można powiązać (nazwając go zgodnie z kształtem wielkiej litery „T”) przy użyciu metody tee()
.
Spowoduje to zablokowanie strumienia, czyli nie będzie już można z niej korzystać bezpośrednio. ale utworzy 2 nowe
strumienie, nazywane gałęziami, które mogą być wykorzystywane niezależnie.
Odtwarzanie jest też ważne, ponieważ transmisji nie można przewinąć ani ponownie uruchomić. Więcej informacji na ten temat znajdziesz później.
Mechanika czytelnego strumienia
Czytelny strumień to źródło danych reprezentowane w JavaScripcie przez tag
ReadableStream
obiektem, który
i spływają z podstawowego źródła.
ReadableStream()
konstruktor tworzy i zwraca czytelny obiekt strumienia z podanych modułów obsługi. Dostępne są 2
typy bazowego źródła:
- Źródła push nieustannie przesyłają Ci dane, gdy masz do nich dostęp. Ty decydujesz, rozpocząć, wstrzymać lub anulować dostęp do transmisji. Mogą to być na przykład transmisje wideo na żywo, zdarzenia wysłane przez serwer, czyli WebSockets.
- Pobieranie źródeł wymaga jawnego żądania danych z tych źródeł po połączeniu z nimi. Przykłady
uwzględnia operacje HTTP za pomocą wywołań funkcji
fetch()
lubXMLHttpRequest
.
Strumieniowe dane są odczytywane sekwencyjnie w postaci małych fragmentów. Fragmenty umieszczone w strumieniu są umieszczone w kolejce. To oznacza, że czekają w kolejce gotowe do przeczytania. Wewnętrzna kolejka przechowuje fragmenty, które nie zostały jeszcze przeczytane.
Strategia kolejkowania to obiekt określający sposób, w jaki strumień powinien sygnalizować obciążenie wsteczne na podstawie o stanie jej wewnętrznej kolejki. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce do określonej liczby, czyli do wysokiego znaku wodnego.
Fragmenty wewnątrz strumienia są odczytywane przez czytelnika. Ten czytnik pobiera dane po jednym fragmencie co pozwala wykonać dowolne operacje. Czytelnik plus drugi wraz z nim kod przetwarzania jest nazywany konsumentem.
Następny konstrukt w tym kontekście to kontroler. Każdy możliwy do odczytania strumień ma powiązane który, jak sama nazwa wskazuje, umożliwia sterowanie strumieniem.
Tylko 1 czytelnik może w danym momencie odczytać strumień. gdy użytkownik zostanie utworzony i zacznie czytać strumień. (czyli staje się aktywnym czytnikiem), jest na nim zablokowany. Jeśli chcesz, aby inny czytelnik przejął kontrolę gdy czytasz strumień, musisz zwykle zwolnić pierwszego czytelnika, zanim wykonasz inne czynności. (ale możesz oglądać transmisje).
Tworzenie czytelnego strumienia
Generując czytelny strumień, wywołując jego konstruktor
ReadableStream()
Konstruktor ma opcjonalny argument underlyingSource
, który reprezentuje obiekt.
z metodami i właściwościami, które określają, jak będzie zachowywać się utworzona instancja strumienia.
underlyingSource
Mogą to być następujące opcjonalne metody zdefiniowane przez programistę:
start(controller)
: wywoływana natychmiast po utworzeniu obiektu. metoda może uzyskać dostęp do źródła strumienia i wykonać inne czynności wymagane do skonfigurowania funkcji strumieniowania. Jeśli ten proces ma zostać wykonany asynchronicznie, metoda może zwracają obietnicę sukcesu lub porażki. Parametrcontroller
przekazywany do tej metody to wReadableStreamDefaultController
pull(controller)
: może służyć do sterowania strumieniem podczas pobierania kolejnych fragmentów. it jest wywoływana wielokrotnie, dopóki wewnętrzna kolejka fragmentów strumienia nie będzie pełna, aż do momentu osiąga wysoki znak wodny. Jeśli wynik wywołania funkcjipull()
jest obiecujący, Usługapull()
nie zostanie wywołana ponownie, dopóki ta obietnica nie zostanie zrealizowana. Jeśli obietnica zostanie odrzucona, podczas transmisji wystąpi błąd.cancel(reason)
: wywołanie, gdy konsument anuluje transmisję.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
obsługuje te metody:
ReadableStreamDefaultController.close()
zamyka powiązany strumień.ReadableStreamDefaultController.enqueue()
dodaje dany fragment do kolejki w powiązanym strumieniu.ReadableStreamDefaultController.error()
spowoduje błąd we wszystkich przyszłych interakcjach z powiązanym strumieniem.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Drugim, podobnie opcjonalnym argumentem konstruktora ReadableStream()
, jest queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia, która składa się z dwóch
parametry:
highWaterMark
: liczba nieujemna wskazująca na wysoki znak wodny strumienia przy użyciu tej strategii kolejkowania.size(chunk)
: funkcja, która oblicza i zwraca skończony, nieujemny rozmiar danej wartości fragmentu. Wynik służy do określania naprężenia wstecznego, które pojawia się za pomocą odpowiedniej właściwościReadableStreamDefaultController.desiredSize
. Określa też to, kiedy wywoływana jest metodapull()
źródła.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Metody getReader()
i read()
Aby czytać z czytelnego strumienia, potrzebujesz czytnika, który będzie
ReadableStreamDefaultReader
Metoda getReader()
interfejsu ReadableStream
tworzy czytnik i blokuje strumień
. Gdy strumień jest zablokowany, nie można pozyskać żadnego innego czytnika, dopóki nie zostanie on zwolniony.
read()
metoda interfejsu ReadableStreamDefaultReader
zwraca obietnicę dającą dostęp do kolejnej
w wewnętrznej kolejce strumienia. Wypełnia lub odrzuca z wynikiem w zależności od stanu
w transmisji na żywo. Istnieją następujące możliwości:
- Jeśli fragment jest dostępny, obietnica zostanie zrealizowana za pomocą obiektu formularza
{ value: chunk, done: false }
- Jeśli strumień zostanie zamknięty, obietnica zostanie zrealizowana przez obiekt formularza
{ value: undefined, done: true }
- Jeśli podczas transmisji wystąpi błąd, obietnica zostanie odrzucona z odpowiednim błędem.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Właściwość locked
Aby sprawdzić, czy możliwy do odczytania strumień jest zablokowany, otwórz jego
ReadableStream.locked
usłudze.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Przykładowy kod strumienia, który można odczytać
Poniższy przykładowy kod pokazuje wszystkie kroki w praktyce. Najpierw tworzysz ReadableStream
, który w
Argument underlyingSource
(czyli klasa TimestampSource
) definiuje metodę start()
.
Ta metoda informuje właściwość controller
strumienia do
enqueue()
oznacza sygnaturę czasową co sekundę w ciągu 10 sekund.
Na koniec informuje kontroler, że ma close()
strumień. Spożywasz
streamując, tworząc czytnik za pomocą metody getReader()
i wywołując funkcję read()
, dopóki strumień nie zostanie
done
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iteracja asynchroniczna
Sprawdzanie każdej iteracji pętli read()
, jeśli strumień jest done
, może nie być najwygodniejszym interfejsem API.
Na szczęście wkrótce istnieje lepszy sposób wykonywania tych czynności: iteracja asynchroniczna.
for await (const chunk of stream) {
console.log(chunk);
}
Aby obejść ten problem, obecnie można stosować iterację asynchroniczną, stosując kod polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Tworzenie czytelnego strumienia
Metoda tee()
funkcji
Interfejs ReadableStream
łączy bieżący czytelny strumień, zwracając tablicę dwuelementową
zawierające dwie powstałe gałęzie jako nowe instancje ReadableStream
. Dzięki temu
2 czytelników może jednocześnie czytać strumień. Można to zrobić na przykład w mechanizmie Service Worker, jeśli
chcesz pobrać odpowiedź z serwera i przesłać ją strumieniowo do przeglądarki, ale także przesyłać ją strumieniowo
pamięci podręcznej skryptu service worker. Treść odpowiedzi nie może być wykorzystana więcej niż raz, więc potrzebujesz 2 kopii
w tym celu. Aby anulować strumień, musisz najpierw anulować obie zawarte w nim gałęzie. Rozpoczynanie transmisji
zwykle blokuje się na określony czas, przez co inni czytelnicy nie mogą go zablokować.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Czytelne strumienie bajtów
Dla strumieni reprezentujących bajty dostarczana jest rozszerzona wersja czytelnego strumienia do obsługi i oszczędność bajtów, zwłaszcza przez zminimalizowanie liczby kopii. Strumienie bajtów umożliwiają korzystanie z własnego bufora (BYOB) czytelników do pozyskania. Domyślna implementacja może dawać zakres różnych danych wyjściowych, takich jak jako ciągi lub bufory tablicy w przypadku technologii WebSockets, podczas gdy strumienie bajtów gwarantują wynik w bajtach. Oprócz tego czytelnicy BYOB mają korzyści ze stabilności. To jest ponieważ odłączenie bufora może zagwarantować, że nie zapisze się on dwa razy w tym samym buforze, co pozwala uniknąć wyścigów. Czytniki BYOB mogą zmniejszyć liczbę uruchomień przeglądarki czyszczenia pamięci, ponieważ może ono wykorzystywać bufory.
Tworzenie czytelnego strumienia bajtów
Możesz utworzyć czytelny strumień bajtów, przekazując dodatkowy parametr type
do funkcji
ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
Bazowe źródło zrozumiałego strumienia bajtowego otrzymuje ReadableByteStreamController
do
manipulacja. Metoda ReadableByteStreamController.enqueue()
przyjmuje argument chunk
, którego wartość
ma status ArrayBufferView
. Właściwość ReadableByteStreamController.byobRequest
zwraca aktualną
Żądanie pull BYOB lub wartość null, jeśli nie ma żadnego. Na koniec ReadableByteStreamController.desiredSize
zwraca żądany rozmiar, aby wypełnić wewnętrzną kolejkę kontrolowanego strumienia.
queuingStrategy
Drugim, podobnie opcjonalnym argumentem konstruktora ReadableStream()
, jest queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejki dla strumienia, która przyjmuje jedną
:
highWaterMark
: nieujemna liczba bajtów wskazująca na wysoki znak wodny strumienia korzystającego z tej strategii kolejkowania. Służy do określania obciążenia wstecznego, które pojawia się za pomocą odpowiedniej właściwościReadableByteStreamController.desiredSize
. Określa też to, kiedy wywoływana jest metodapull()
źródła.
Metody getReader()
i read()
Aby uzyskać dostęp do elementu ReadableStreamBYOBReader
, ustaw odpowiednio parametr mode
:
ReadableStream.getReader({ mode: "byob" })
Umożliwia to dokładniejszą kontrolę bufora.
aby uniknąć kopii. Aby odczytać strumień bajtów, musisz wywołać
ReadableStreamBYOBReader.read(view)
, gdzie view
to
ArrayBufferView
.
Przykładowy kod strumienia bajtowego, który można odczytać
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Poniższa funkcja zwraca możliwe do odczytania strumienie bajtów, które umożliwiają wydajny odczyt losowo wygenerowana tablica. Zamiast wstępnie określonego rozmiaru fragmentu 1024 próbuje on wypełnić dzięki buforowi dostarczanemu przez programistę, co daje pełną kontrolę.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Mechanika strumienia, który można zapisać
Strumień z możliwością zapisu to miejsce docelowe, w którym możesz zapisywać dane. Jest ono reprezentowane w JavaScripcie przez
WritableStream
. Ten
pełni funkcję abstrakcji „nad ujściem” bazowego ujścia – ujścia wejścia-wyjścia niższego poziomu,
nieprzetworzonych danych.
Dane są zapisywane w strumieniu przez zapisujący, po jednym fragmencie. Fragment może zająć w wielu formach, tak jak w przypadku fragmentów w czytniku. Możesz użyć dowolnego kodu fragmenty gotowe do pisania, scenarzysta oraz powiązany z nim kod są nazywane producentem.
Gdy twórca zostaje utworzony i zaczyna pisać w strumieniu (jako aktywny twórca), mówi się, że zablokowanych. W strumieniu z możliwością zapisu może jednocześnie zapisywać tylko jeden twórca. Jeśli chcesz otrzymać kolejną scenarzysty, aby zacząć pisać do swojego strumienia, zwykle musisz go opublikować przed załączeniem wiadomości innego autora.
Wewnętrzna kolejka zawiera informacje o fragmentach, które zostały jeszcze zapisane w strumieniu, ale jeszcze ich nie ma. zostały przetworzone przez bazowe ujście.
Strategia kolejkowania to obiekt określający sposób, w jaki strumień powinien sygnalizować obciążenie wsteczne na podstawie o stanie jej wewnętrznej kolejki. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje łączny rozmiar wszystkich fragmentów w kolejce do określonej liczby, czyli do wysokiego znaku wodnego.
Ostatni konstrukcja nosi nazwę kontrolera. Każdy strumień możliwy do zapisu ma powiązany kontroler, który umożliwia sterowanie transmisją (np. jej przerwanie).
Tworzenie strumienia z możliwością zapisu
Interfejs WritableStream
usługi
interfejs Streams API zapewnia standardową abstrakcję zapisu danych strumieniowanych do miejsca docelowego,
jako zlew. Ten obiekt ma wbudowane funkcje backendu i kolejki. Tworzysz strumień z możliwością zapisu przez
wywoływanie jego konstruktora
WritableStream()
Zawiera opcjonalny parametr underlyingSink
, który reprezentuje obiekt
z metodami i właściwościami, które określają, jak będzie zachowywać się utworzona instancja strumienia.
underlyingSink
underlyingSink
może zawierać następujące opcjonalne metody zdefiniowane przez programistę. controller
do niektórych metod jest parametr
WritableStreamDefaultController
start(controller)
: ta metoda jest wywoływana natychmiast po utworzeniu obiektu. treści tej metody powinny mieć na celu uzyskanie dostępu do bazowego ujścia. Jeśli ma to być asynchronicznie, może zwrócić obietnicę sukcesu lub porażki.write(chunk, controller)
: ta metoda jest wywoływana, gdy nowy fragment danych (określony w parametrzechunk
) jest gotowy do zapisu w bazowym ujściu. Może zwrócić obietnicę powodzenie lub niepowodzenie operacji zapisu. Ta metoda zostanie wywołana dopiero po poprzedniej były zapisywane i nigdy po zamknięciu lub przerwaniu strumienia.close(controller)
: ta metoda jest wywoływana, gdy aplikacja zasygnalizuje, że zakończyła pisanie fragmenty do strumienia. Zawartość powinna wykonywać niezbędne działania, aby zakończyć zapisy w w bazowym ujściu i zwolnić do niego dostęp. Jeśli ten proces jest asynchroniczny, może zwrócić błąd obiecują zasygnalizowanie sukcesu lub porażki. Ta metoda zostanie wywołana dopiero po wszystkich zapisach w kolejce im się udało.abort(reason)
: ta metoda jest wywoływana, gdy aplikacja zasygnalizuje, że chce nagle się zamknąć. i określić jego stan jako błędu. Narzędzie to może wyczyścić wstrzymane zasoby, Funkcjaclose()
, ale funkcjaabort()
zostanie wywołana nawet wtedy, gdy zapisy znajdują się w kolejce. Te kawałki zostaną wrzucone w domu. Jeśli ten proces jest asynchroniczny, może zwrócić obietnicę sukcesu lub niepowodzenia. Parametrreason
zawiera wartośćDOMString
opisującą, dlaczego strumień został przerwany.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
WritableStreamDefaultController
interfejs Streams API reprezentuje kontroler umożliwiający kontrolę stanu WritableStream
podczas tworzenia, podczas przesyłania kolejnych fragmentów do pisania lub na końcu pisania. Podczas tworzenia
a WritableStream
, bazowe ujście otrzymuje odpowiednią wartość WritableStreamDefaultController
do manipulacji. Funkcja WritableStreamDefaultController
ma tylko 1 metodę:
WritableStreamDefaultController.error()
,
co powoduje, że wszystkie przyszłe interakcje z tym strumieniem dają błąd.
Funkcja WritableStreamDefaultController
obsługuje również właściwość signal
, która zwraca instancję
AbortSignal
,
co umożliwia zatrzymanie operacji WritableStream
w razie potrzeby.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Drugim, podobnie opcjonalnym argumentem konstruktora WritableStream()
, jest queuingStrategy
.
Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania strumienia, która składa się z dwóch
parametry:
highWaterMark
: liczba nieujemna wskazująca na wysoki znak wodny strumienia przy użyciu tej strategii kolejkowania.size(chunk)
: funkcja, która oblicza i zwraca skończony, nieujemny rozmiar danej wartości fragmentu. Wynik służy do określania naprężenia wstecznego, które pojawia się za pomocą odpowiedniej właściwościWritableStreamDefaultWriter.desiredSize
.
Metody getWriter()
i write()
Aby pisać w strumieniu z możliwością zapisu, musisz mieć scenarzystę, który będzie
WritableStreamDefaultWriter
Metoda getWriter()
interfejsu WritableStream
zwraca błąd
nową instancję WritableStreamDefaultWriter
i blokuje strumień do tej instancji. Natomiast
strumień jest zablokowany, nie można uzyskać innego zapisującego, dopóki bieżący nie zostanie zwolniony.
write()
funkcji
WritableStreamDefaultWriter
interfejs zapisuje przekazany fragment danych do obiektu WritableStream
i jego ujścia, a następnie zwraca
obietnica oznaczająca powodzenie lub niepowodzenie operacji zapisu. Pamiętaj, że
„sukces” środków zależy od poziomu ich ujścia; może to oznaczać, że fragment został zaakceptowany,
nie musi być bezpiecznie zapisana w ostatecznym miejscu docelowym.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Właściwość locked
Aby sprawdzić, czy strumień możliwy do zapisu jest zablokowany, otwórz
WritableStream.locked
usłudze.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Przykładowy kod strumienia z możliwością zapisu
Poniższy przykładowy kod pokazuje wszystkie kroki w praktyce.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Przypisanie czytelnego strumienia do strumienia z możliwością zapisu
Czytelny strumień może zostać przesłany do strumienia możliwego do zapisu przez
pipeTo()
.
ReadableStream.pipeTo()
przekształca wartość bieżącą ReadableStream
w dane WritableStream
i zwraca
obietnica spełniająca się, gdy proces potoku zakończy się pomyślnie, lub odrzuca, jeśli wystąpiły błędy
napotkano problem.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Tworzenie strumienia przekształcenia
Interfejs TransformStream
interfejsu Streams API reprezentuje zbiór danych, które można przekształcić. Ty
utworzyć strumień przekształcenia, wywołując jego konstruktor TransformStream()
, który tworzy i zwraca
obiektu strumienia przekształcenia z podanych modułów obsługi. Konstruktor TransformStream()
akceptuje jako
pierwszym argumentem jest opcjonalny obiekt JavaScript reprezentujący obiekt transformer
. Takie obiekty mogą
metody mogą być użyte w tych przypadkach:
transformer
start(controller)
: ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zwykle służy do umieszczania w kolejce fragmentów prefiksu za pomocą funkcjicontroller.enqueue()
. Te fragmenty zostaną przeczytane z czytelnej strony, ale nie wymagają żadnych zapisów po stronie możliwej do zapisu. Jeśli pierwsza jest asynchroniczny, ponieważ np. uzyskanie fragmentów prefiksu wymaga pewnego wysiłku, funkcja może zwrócić obietnicę sukcesu lub niepowodzenia; odrzucona obietnica spowoduje błąd . Wszystkie zgłoszone wyjątki zostaną ponownie zgłoszone przez konstruktorTransformStream()
.transform(chunk, controller)
: ta metoda jest wywoływana, gdy nowy fragment został pierwotnie zapisany w dostępna do zapisu strona jest gotowa do przekształcenia. Implementacja strumienia gwarantuje, że ta funkcja jest wywoływana dopiero po tym, jak poprzednie przekształcenia zakończą się sukcesem. Funkcjastart()
nigdy nie miała lub po wywołaniu funkcjiflush()
. Ta funkcja wykonuje rzeczywistą przekształcenie dla strumienia przekształcania. Może dodać wyniki do kolejki za pomocą funkcjicontroller.enqueue()
. Ten umożliwia zapis pojedynczego fragmentu po stronie możliwej do zapisu. W przypadku otrzymania parametru czytelna strona w zależności od tego, ile razy funkcjacontroller.enqueue()
została wywołana. Jeśli proces transformacji jest asynchroniczne, ta funkcja może zwrócić obietnicę sukcesu lub niepowodzenia do jej przekształcenia. Odrzucona obietnica spowoduje błąd zarówno na czytelnej, jak i zapisie przekształcenia. Jeśli nie podano metodytransform()
, używane jest przekształcenie tożsamości, które dodaje do kolejki fragmenty niezmienione od strony możliwej do zapisu na czytelną stronę.flush(controller)
: ta metoda jest wywoływana po tym, jak wszystkie fragmenty zapisane po stronie możliwej do zapisu zostały zostanie przekształcone przez pomyślne przejście przeztransform()
, a strona do zapisu ma zostać zamknięto. Zwykle służy to do umieszczania w kolejce fragmentów sufiksu po czytelniczej stronie. zostanie zamknięte. Jeśli proces opróżniania jest asynchroniczny, funkcja może zwrócić obietnicę sygnał o powodzeniu lub niepowodzeniu sygnału; wynik zostanie przekazany do urządzenia wywołującegostream.writable.write()
Dodatkowo odrzucona obietnica powoduje błędy zarówno w czytelnym, z drugiej strony strumienia. Zgłoszenie wyjątku jest traktowane tak samo jak zwrócenie odrzuconego lub obiecywanie.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Strategie kolejkowania: writableStrategy
i readableStrategy
Drugi i trzeci opcjonalny parametr konstruktora TransformStream()
są opcjonalne
Strategie kolejkowania: writableStrategy
i readableStrategy
. Są one zdefiniowane zgodnie z
Odczytujący i zapisywalny strumień
sekcji.
Przykładowy kod strumienia przekształcania
Poniższy przykładowy kod pokazuje prosty strumień przekształcenia w działaniu.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Potokowanie czytelnego strumienia w strumieniu przekształcenia
pipeThrough()
metoda interfejsu ReadableStream
zapewnia łańcuchowy sposób dostarczania potoku w przypadku bieżącego strumienia
za pomocą strumienia przekształcenia lub dowolnej innej możliwej do zapisu/odczytu pary. Przesyłanie strumienia zwykle jest blokowane
przez cały czas jej trwania, uniemożliwiając innym czytelnikom zablokowanie go.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Następny przykładowy (nieco zmodyfikowany) kod pokazuje, jak zaimplementować funkcję „krzyki” wersja systemu fetch()
w którym cały tekst jest pisany wielkimi literami dzięki zastosowaniu zwróconej obietnicy odpowiedzi
jako strumień
i używając wielkich liter we fragmencie. Zaletą tego rozwiązania jest to, że nie trzeba czekać,
cały dokument, co może mieć ogromne znaczenie przy pracy z dużymi plikami.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Prezentacja
Poniższa demonstracja pokazuje działanie strumieni czytelnych, możliwych do zapisu i przekształconych. Zawiera też przykłady
pipeThrough()
i pipeTo()
, a także pokazuje tee()
. Opcjonalnie możesz uruchomić
prezentację w osobnym oknie lub wyświetl
kodu źródłowego.
Użyteczne strumienie dostępne w przeglądarce
W przeglądarce jest wiele przydatnych strumieni. Możesz łatwo utworzyć
ReadableStream
z bloba. Blob
zwraca metodę stream() interfejsu
ReadableStream
, który po odczytaniu zwraca dane zawarte w obiekcie blob. Pamiętaj też, że
Obiekt File
jest konkretnym rodzajem
Blob
i można go używać w dowolnym kontekście, w jakim może działać obiekt blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Warianty strumienia TextDecoder.decode()
i TextEncoder.encode()
są nazywane
TextDecoderStream
i
TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Kompresja lub dekompresja pliku jest łatwa dzięki
CompressionStream
i
Strumienie przekształceń: DecompressionStream
. Poniższy przykładowy kod pokazuje, jak pobrać specyfikację strumieni i skompresować ją (gzip)
bezpośrednio w przeglądarce i zapisać skompresowany plik bezpośrednio na dysku.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Interfejs File System Access API
FileSystemWritableFileStream
.
a eksperymentalne strumienie żądań fetch()
są
przykłady strumieni dostępnych do zapisu.
Interfejs Serial API często korzysta zarówno z czytelnych, jak i zapisujących strumieni.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Interfejs WebSocketStream
API integruje strumienie z interfejsem WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Przydatne materiały
- Specyfikacja strumieni
- Dołączone wersje demonstracyjne
- Kod polyfill dla strumieni
- 2016 – rok strumieni danych z sieci
- Iteratory i generatory asynchroniczne
- Wizualizacja do transmisji na żywo
Podziękowania
Ten artykuł zrecenzował(a) Jake Archibald François Beaufort Sam Dutton Mattias Buelens Surma Joe Medley Adam Rice. Posty na blogu Jake'a Archibalda bardzo mi pomogły strumienie. Część przykładowego kodu jest inspirowana przez użytkownika GitHuba eksploracji użytkownika @bellbind oraz opiera się w dużej mierze na Dokumenty internetowe MDN w strumieniach Streams Standard autorzy wykonali ogromną pracę tę specyfikację. Baner powitalny: Ryan Lara w Odchylenie z ekranu.