Streams – der ultimative Leitfaden

Informationen zum Verwenden lesbarer, beschreibbarer und transformierbarer Streams mit der Streams API

Mit der Streams API können Sie programmatisch auf Datenstreams zugreifen, die über das Netzwerk empfangen oder auf andere Weise lokal erstellt wurden, und sie mit JavaScript verarbeiten. Beim Streaming wird eine Ressource, die Sie empfangen, senden oder transformieren möchten, in kleine Blöcke zerlegt und dann bitweise verarbeitet. Streaming ist zwar eine Funktion, die Browser ohnehin nutzen, wenn sie Assets wie HTML oder Videos empfangen, die auf Webseiten angezeigt werden sollen, aber diese Funktion war für JavaScript nie verfügbar, bevor 2015 fetch mit Streams eingeführt wurde.

Bisher mussten Sie, wenn Sie eine Ressource verarbeiten wollten (z. B. ein Video oder eine Textdatei), die gesamte Datei herunterladen, warten, bis sie in ein geeignetes Format deserialisiert wurde, und sie dann verarbeiten. Mit Streams, die für JavaScript verfügbar sind, ändert sich das. Sie können jetzt Rohdaten mit JavaScript schrittweise verarbeiten, sobald sie auf dem Client verfügbar sind, ohne einen Puffer, String oder Blob generieren zu müssen. Dies eröffnet eine Reihe von Anwendungsfällen, von denen einige unten aufgeführt sind:

  • Videoeffekte:Ein lesbarer Videostream wird durch einen Transformationsstream geleitet, der Effekte in Echtzeit anwendet.
  • Datenkomprimierung/-dekomprimierung: Ein Dateistream wird durch einen Transformationsstream geleitet, der ihn selektiv komprimiert bzw. dekomprimiert.
  • Bilddekodierung: Ein HTTP-Antwortstream wird durch einen Transformationsstream geleitet, der Bytes in Bitmap-Daten decodiert, und dann durch einen weiteren Transformationsstream, der Bitmaps in PNGs umwandelt. Wenn sie im fetch-Handler eines Service Workers installiert ist, können Sie neue Bildformate wie AVIF transparent polyfillen.

Unterstützte Browser

ReadableStream und WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

Wichtige Konzepte

Bevor ich auf die verschiedenen Arten von Streams eingehe, möchte ich einige grundlegende Konzepte vorstellen.

Chunks

Ein Chunk ist ein einzelnes Datenelement, das in einen Stream geschrieben oder daraus gelesen wird. Sie können jeden Typ haben und sogar Blöcke verschiedener Typen enthalten. In den meisten Fällen ist ein Chunk nicht die kleinste Dateneinheit für einen bestimmten Stream. Ein Byte-Stream kann beispielsweise statt einzelner Byte-Chunks aus 16 KiB Uint8Array-Einheiten bestehen.

Lesbare Streams

Ein lesbarer Stream ist eine Datenquelle, aus der Sie Daten lesen können. Mit anderen Worten: Daten fließen aus einem lesbaren Stream. Ein lesbarer Stream ist konkret eine Instanz der Klasse ReadableStream.

Beschreibbare Streams

Ein beschreibbarer Stream ist ein Ziel für Daten, in das Sie schreiben können. Mit anderen Worten: Daten werden in einen beschreibbaren Stream geleitet. Ein beschreibbarer Stream ist konkret eine Instanz der Klasse WritableStream.

Streams transformieren

Ein Transformierungsstream besteht aus einem Streampaar: einem schreibbaren Stream, der als Schreibseite bezeichnet wird, und einem lesbaren Stream, der als Leseseite bezeichnet wird. Eine reale Metapher dafür wäre ein Simultandolmetscher, der spontan von einer Sprache in eine andere übersetzt. Wenn auf die beschreibbare Seite geschrieben wird, werden auf transformspezifische Weise neue Daten zum Lesen auf der lesbaren Seite verfügbar gemacht. Konkret kann jedes Objekt mit einer writable- und einer readable-Property als Transformierungsstream dienen. Mit der Standard-TransformStream-Klasse ist es jedoch einfacher, ein solches Paar zu erstellen, das richtig verdrillt ist.

Rohrketten

Streams werden hauptsächlich durch Pipes miteinander verbunden. Ein lesbarer Stream kann mithilfe der Methode pipeTo() des lesbaren Streams direkt an einen beschreibbaren Stream weitergeleitet werden. Er kann auch zuerst mithilfe der Methode pipeThrough() des lesbaren Streams durch einen oder mehrere Transformierungsstreams weitergeleitet werden. Ein Set von Streams, die auf diese Weise zusammengeführt werden, wird als Pipe-Chain bezeichnet.

Gegendruck

Sobald eine Pipe-Kette erstellt wurde, werden Signale darüber übertragen, wie schnell Chunks durch sie fließen sollen. Wenn ein Schritt in der Kette noch keine Chunks akzeptieren kann, wird ein Signal rückwärts durch die Pipe-Kette gesendet, bis die ursprüngliche Quelle aufgefordert wird, nicht mehr so schnell Chunks zu generieren. Dieser Vorgang, bei dem der Durchfluss normalisiert wird, wird als Gegendruck bezeichnet.

Abschlag

Ein lesbarer Stream kann mithilfe der tee()-Methode geteed (benannt nach der Form eines großen „T“) werden. Dadurch wird der Stream gesperrt, d. h., er kann nicht mehr direkt verwendet werden. Es werden jedoch zwei neue Streams erstellt, sogenannte Branches, die unabhängig voneinander genutzt werden können. Das Teeing ist auch wichtig, weil Streams nicht zurückgespult oder neu gestartet werden können. Mehr dazu später.

Diagramm einer Pipe-Kette, die aus einem lesbaren Stream besteht, der von einem Aufruf der Fetch API stammt und dann durch einen Transformierungsstream geleitet wird, dessen Ausgabe aufgeteilt und dann an den Browser für den ersten resultierenden lesbaren Stream und an den Service Worker-Cache für den zweiten resultierenden lesbaren Stream gesendet wird.
Eine Pipechain.

Funktionsweise eines lesbaren Streams

Ein lesbarer Stream ist eine Datenquelle, die in JavaScript durch ein ReadableStream-Objekt dargestellt wird, das aus einer zugrunde liegenden Quelle stammt. Der Konstruktor von ReadableStream() erstellt ein lesbares Streamobjekt aus den angegebenen Handlers und gibt es zurück. Es gibt zwei Arten von zugrunde liegenden Quellen:

  • Push-Quellen senden Ihnen ständig Daten, wenn Sie darauf zugegriffen haben. Sie müssen den Zugriff auf den Stream starten, pausieren oder beenden. Beispiele sind Live-Videostreams, servergesendete Ereignisse oder WebSockets.
  • Bei Pull-Quellen müssen Sie Daten ausdrücklich anfordern, sobald eine Verbindung hergestellt wurde. Beispiele hierfür sind HTTP-Vorgänge über fetch()- oder XMLHttpRequest-Aufrufe.

Streamdaten werden sequenziell in kleinen Teilen gelesen, die als Chunks bezeichnet werden. Die in einem Stream platzierten Chunks werden in die Warteschlange gestellt. Das bedeutet, dass sie in einer Warteschlange warten und gelesen werden können. Eine interne Warteschlange überwacht die noch nicht gelesenen Blöcke.

Eine Warteschlangenstrategie ist ein Objekt, das festlegt, wie ein Stream Backpressure basierend auf dem Status seiner internen Warteschlange signalisieren soll. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht die Gesamtgröße aller Blöcke in der Warteschlange mit einer bestimmten Zahl, der sogenannten High-Water-Mark.

Die Chunks im Stream werden von einem Reader gelesen. Dieser Leser ruft die Daten jeweils in einem Block ab, sodass Sie beliebige Vorgänge darauf ausführen können. Der Leser und der zugehörige Verarbeitungscode werden als Nutzer bezeichnet.

Das nächste Konstrukt in diesem Kontext wird als Controller bezeichnet. Jedem lesbaren Stream ist ein Controller zugeordnet, mit dem Sie den Stream steuern können.

Ein Stream kann immer nur von einem Leser gelesen werden. Wenn ein Leser erstellt wird und mit dem Lesen eines Streams beginnt (d. h. ein aktiver Leser wird), wird er für diesen Stream gesperrt. Wenn ein anderer Leser den Stream übernehmen soll, musst du in der Regel zuerst den ersten Leser freigeben, bevor du etwas anderes tust. Du kannst aber auch Streams teeen.

Lesbaren Stream erstellen

Sie erstellen einen lesbaren Stream, indem Sie seinen Konstruktor ReadableStream() aufrufen. Der Konstruktor hat ein optionales Argument underlyingSource, das ein Objekt mit Methoden und Eigenschaften darstellt, die festlegen, wie sich die erstellte Streaminstanz verhalten soll.

underlyingSource

Dazu können die folgenden optionalen, vom Entwickler definierten Methoden verwendet werden:

  • start(controller): Wird sofort nach dem Erstellen des Objekts aufgerufen. Die Methode kann auf die Streamquelle zugreifen und alle anderen erforderlichen Schritte zur Einrichtung der Streamfunktion ausführen. Wenn dieser Vorgang asynchron erfolgen soll, kann die Methode ein Versprechen zurückgeben, um Erfolg oder Misserfolg zu signalisieren. Der an diese Methode übergebene Parameter controller ist ein ReadableStreamDefaultController.
  • pull(controller): Kann verwendet werden, um den Stream zu steuern, während weitere Chunks abgerufen werden. Sie wird wiederholt aufgerufen, solange die interne Chunk-Warteschlange des Streams nicht voll ist, bis die Warteschlange ihren Höchststand erreicht. Wenn das Ergebnis des Aufrufs von pull() ein Versprechen ist, wird pull() erst wieder aufgerufen, wenn dieses Versprechen erfüllt ist. Wenn das Versprechen abgelehnt wird, wird der Stream fehlerhaft.
  • cancel(reason): Wird aufgerufen, wenn der Stream-Nutzer den Stream beendet.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController unterstützt die folgenden Methoden:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Das zweite, ebenfalls optionale Argument des ReadableStream()-Konstruktors ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Es nimmt zwei Parameter an:

  • highWaterMark: Eine positive Zahl, die den Höchststand des Streams bei Verwendung dieser Warteschlangenstrategie angibt.
  • size(chunk): Eine Funktion, die die endliche, nicht negative Größe des angegebenen Chunk-Werts berechnet und zurückgibt. Das Ergebnis wird verwendet, um den Gegendruck zu bestimmen, der über die entsprechende ReadableStreamDefaultController.desiredSize-Property angezeigt wird. Außerdem wird damit festgelegt, wann die pull()-Methode der zugrunde liegenden Quelle aufgerufen wird.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Die Methoden getReader() und read()

Um aus einem lesbaren Stream zu lesen, benötigen Sie einen Reader, also einen ReadableStreamDefaultReader. Die Methode getReader() der Schnittstelle ReadableStream erstellt einen Leser und sperrt den Stream für ihn. Solange der Stream gesperrt ist, kann kein anderer Leser erworben werden, bis dieser freigegeben wird.

Die Methode read() der ReadableStreamDefaultReader-Schnittstelle gibt ein Versprechen zurück, das Zugriff auf den nächsten Teil in der internen Warteschlange des Streams gewährt. Je nach Status des Streams wird der Vorgang erfüllt oder abgelehnt. Es gibt folgende Möglichkeiten:

  • Wenn ein Chunk verfügbar ist, wird das Versprechen mit einem Objekt vom Typ
    { value: chunk, done: false } erfüllt.
  • Wenn der Stream geschlossen wird, wird das Versprechen mit einem Objekt vom Typ
    { value: undefined, done: true } erfüllt.
  • Wenn beim Stream ein Fehler auftritt, wird das Versprechen mit dem entsprechenden Fehler abgelehnt.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Die Property locked

Sie können prüfen, ob ein lesbarer Stream gesperrt ist, indem Sie auf seine ReadableStream.locked-Property zugreifen.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Codebeispiele für lesbare Streams

Im folgenden Codebeispiel werden alle Schritte veranschaulicht. Sie erstellen zuerst eine ReadableStream, die im underlyingSource-Argument (d. h. in der TimestampSource-Klasse) eine start()-Methode definiert. Mit dieser Methode wird der controller des Streams angewiesen, alle zehn Sekunden einen Zeitstempel zu enqueue(). Schließlich wird der Controller angewiesen, den Stream close() zu senden. Sie können diesen Stream nutzen, indem Sie über die Methode getReader() einen Leser erstellen und read() aufrufen, bis der Stream done ist.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Asynchrone Iteration

Es ist nicht die praktischste API, bei jeder read()-Schleifeniteration zu prüfen, ob der Stream done ist. Glücklicherweise gibt es bald eine bessere Möglichkeit: die asynchrone Iteration.

for await (const chunk of stream) {
  console.log(chunk);
}

Eine Möglichkeit, die asynchrone Iteration derzeit zu verwenden, besteht darin, das Verhalten mit einer Polyfill zu implementieren.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Lesbaren Stream erstellen

Die Methode tee() der ReadableStream-Schnittstelle teilt den aktuellen lesbaren Stream und gibt ein Array mit zwei Elementen zurück, das die beiden resultierenden Verzweigungen als neue ReadableStream-Instanzen enthält. So können zwei Leser einen Stream gleichzeitig lesen. Das kann beispielsweise in einem Service Worker geschehen, wenn Sie eine Antwort vom Server abrufen und an den Browser streamen, aber auch an den Service Worker-Cache. Da ein Antworttext nicht mehrmals verwendet werden kann, benötigen Sie dazu zwei Kopien. Wenn Sie den Stream kündigen möchten, müssen Sie beide resultierenden Verzweigungen kündigen. Wenn Sie einen Stream teeen, wird er in der Regel für die Dauer der Ausführung gesperrt, sodass andere Leser ihn nicht sperren können.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Lesbare Byte-Streams

Für Streams, die Bytes darstellen, wird eine erweiterte Version des lesbaren Streams bereitgestellt, um Bytes effizient zu verarbeiten, insbesondere durch Minimierung von Kopien. Mit Bytestreams können Leser mit BYOB-Puffer (Bring Your Own Buffer) erworben werden. Die Standardimplementierung kann eine Reihe verschiedener Ausgaben liefern, z. B. Strings oder Array-Buffer im Fall von WebSockets. Bei Bytestreams ist dagegen eine Byteausgabe garantiert. Außerdem bieten BYOB-Lesegeräte Vorteile in Bezug auf die Stabilität. Wenn ein Puffer getrennt wird, kann sichergestellt werden, dass nicht zweimal in denselben Puffer geschrieben wird, wodurch Race-Bedingungen vermieden werden. Mit BYOB-Lesern kann die Häufigkeit, mit der der Browser die Garbage Collection ausführen muss, reduziert werden, da Puffer wiederverwendet werden können.

Lesbaren Bytestream erstellen

Sie können einen lesbaren Bytestream erstellen, indem Sie dem Konstruktor von ReadableStream() einen zusätzlichen type-Parameter übergeben.

new ReadableStream({ type: 'bytes' });

underlyingSource

Die zugrunde liegende Quelle eines lesbaren Byte-Streams erhält eine ReadableByteStreamController, die manipuliert werden soll. Die Methode ReadableByteStreamController.enqueue() nimmt ein chunk-Argument an, dessen Wert eine ArrayBufferView ist. Die Property ReadableByteStreamController.byobRequest gibt den aktuellen BYOB-Pull-Request zurück oder „null“, wenn es keinen gibt. Die Eigenschaft ReadableByteStreamController.desiredSize gibt schließlich die gewünschte Größe zurück, um die interne Warteschlange des gesteuerten Streams zu füllen.

queuingStrategy

Das zweite, ebenfalls optionale Argument des ReadableStream()-Konstruktors ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Es nimmt einen Parameter an:

  • highWaterMark: Eine nicht negative Anzahl von Byte, die den Höchststand des Streams bei Verwendung dieser Warteschlangenstrategie angibt. Damit wird der Gegendruck bestimmt, der über die entsprechende ReadableByteStreamController.desiredSize-Property angezeigt wird. Außerdem wird damit festgelegt, wann die pull()-Methode der zugrunde liegenden Quelle aufgerufen wird.

Die Methoden getReader() und read()

Sie können dann auf eine ReadableStreamBYOBReader zugreifen, indem Sie den Parameter mode entsprechend festlegen: ReadableStream.getReader({ mode: "byob" }). So lässt sich die Pufferzuweisung genauer steuern, um Kopien zu vermeiden. Wenn Sie aus dem Bytestream lesen möchten, müssen Sie ReadableStreamBYOBReader.read(view) aufrufen, wobei view ein ArrayBufferView ist.

Codebeispiel für lesbaren Bytestream

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Die folgende Funktion gibt lesbare Bytestreams zurück, die ein effizientes Lesen eines zufällig generierten Arrays ohne Kopiervorgang ermöglichen. Anstatt eine vordefinierte Blockgröße von 1.024 zu verwenden,wird versucht, den vom Entwickler bereitgestellten Puffer zu füllen, was eine vollständige Kontrolle ermöglicht.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Die Funktionsweise eines beschreibbaren Streams

Ein beschreibbarer Stream ist ein Ziel, in das Sie Daten schreiben können, die in JavaScript durch ein WritableStream-Objekt dargestellt werden. Dies dient als Abstraktion über einem untergeordneten Sink, einem I/O-Sink auf niedrigerer Ebene, in den Rohdaten geschrieben werden.

Die Daten werden über einen Writer nach und nach in den Stream geschrieben. Ein Block kann viele Formen haben, genau wie die Blöcke in einem Reader. Sie können beliebigen Code verwenden, um die Chunks für das Schreiben zu erstellen. Der Writer und der zugehörige Code werden als Producer bezeichnet.

Wenn ein Writer erstellt wird und mit dem Schreiben in einen Stream beginnt (ein aktiver Writer), wird er für diesen Stream gesperrt. In einen beschreibbaren Stream kann jeweils nur ein Writer schreiben. Wenn ein anderer Writer auf deinen Stream schreiben soll, musst du ihn in der Regel freigeben, bevor du einen anderen Writer zuweist.

Eine interne Warteschlange überwacht die Chunks, die in den Stream geschrieben, aber noch nicht vom zugrunde liegenden Sink verarbeitet wurden.

Eine Warteschlangenstrategie ist ein Objekt, das festlegt, wie ein Stream Backpressure basierend auf dem Status seiner internen Warteschlange signalisieren soll. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht die Gesamtgröße aller Blöcke in der Warteschlange mit einer bestimmten Zahl, der sogenannten High-Water-Mark.

Das finale Konstrukt wird als Controller bezeichnet. Jedem beschreibbaren Stream ist ein Controller zugeordnet, mit dem Sie den Stream steuern können, z. B. abbrechen.

Schreibbaren Stream erstellen

Die Schnittstelle WritableStream der Streams API bietet eine Standardabstraktion zum Schreiben von Streamingdaten an ein Ziel, das als „Sink“ bezeichnet wird. Dieses Objekt bietet integrierte Rückstau- und Warteschlangenfunktionen. Sie erstellen einen beschreibbaren Stream, indem Sie seinen Konstruktor WritableStream() aufrufen. Es hat einen optionalen Parameter underlyingSink, der ein Objekt mit Methoden und Eigenschaften darstellt, die das Verhalten der erstellten Streaminstanz definieren.

underlyingSink

Die underlyingSink kann die folgenden optionalen, vom Entwickler definierten Methoden enthalten. Der Parameter controller, der an einige der Methoden übergeben wird, ist ein WritableStreamDefaultController.

  • start(controller): Diese Methode wird sofort nach dem Erstellen des Objekts aufgerufen. Der Inhalt dieser Methode sollte darauf abzielen, Zugriff auf den zugrunde liegenden Datenablauf zu erhalten. Wenn dieser Vorgang asynchron ausgeführt werden soll, kann er ein Versprechen zurückgeben, um Erfolg oder Misserfolg anzuzeigen.
  • write(chunk, controller): Diese Methode wird aufgerufen, wenn ein neuer Datenblock (im Parameter chunk angegeben) in den zugrunde liegenden Datensammler geschrieben werden kann. Sie kann ein Versprechen zurückgeben, um den Erfolg oder Misserfolg des Schreibvorgangs anzuzeigen. Diese Methode wird nur aufgerufen, nachdem vorherige Schreibvorgänge erfolgreich waren, und nie, nachdem der Stream geschlossen oder abgebrochen wurde.
  • close(controller): Diese Methode wird aufgerufen, wenn die App signalisiert, dass das Schreiben von Chunks in den Stream abgeschlossen ist. Der Inhalt sollte alles tun, was erforderlich ist, um die Schreibvorgänge in der zugrunde liegenden Senke abzuschließen und den Zugriff darauf freizugeben. Wenn dieser Vorgang asynchron ist, kann er ein Versprechen zurückgeben, um Erfolg oder Fehlschlag zu signalisieren. Diese Methode wird erst aufgerufen, nachdem alle in der Warteschlange befindlichen Schreibvorgänge erfolgreich waren.
  • abort(reason): Diese Methode wird aufgerufen, wenn die App signalisiert, dass sie den Stream abrupt schließen und in einen Fehlerstatus versetzen möchte. Es kann alle gehaltenen Ressourcen bereinigen, ähnlich wie close(). abort() wird jedoch auch dann aufgerufen, wenn Schreibvorgänge in die Warteschlange gestellt werden. Diese werden verworfen. Wenn dieser Vorgang asynchron ist, kann er ein Versprechen zurückgeben, um Erfolg oder Fehlschlag zu signalisieren. Der Parameter reason enthält einen DOMString, der beschreibt, warum der Stream abgebrochen wurde.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Die WritableStreamDefaultController-Schnittstelle der Streams API stellt einen Controller dar, mit dem der Status eines WritableStream während der Einrichtung gesteuert werden kann, wenn weitere Chunks zum Schreiben gesendet werden oder am Ende des Schreibens. Beim Erstellen einer WritableStream wird dem zugrunde liegenden Sink eine entsprechende WritableStreamDefaultController-Instanz zugewiesen, die manipuliert werden soll. Der WritableStreamDefaultController hat nur eine Methode: WritableStreamDefaultController.error(). Dies führt dazu, dass zukünftige Interaktionen mit dem zugehörigen Stream zu Fehlern führen. WritableStreamDefaultController unterstützt auch das Attribut signal, das eine Instanz von AbortSignal zurückgibt. So kann ein WritableStream-Vorgang bei Bedarf beendet werden.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Das zweite, ebenfalls optionale Argument des WritableStream()-Konstruktors ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Es nimmt zwei Parameter an:

  • highWaterMark: Eine positive Zahl, die den Höchststand des Streams bei Verwendung dieser Warteschlangenstrategie angibt.
  • size(chunk): Eine Funktion, die die endliche, nicht negative Größe des angegebenen Chunk-Werts berechnet und zurückgibt. Das Ergebnis wird verwendet, um den Gegendruck zu bestimmen, der über die entsprechende WritableStreamDefaultWriter.desiredSize-Property angezeigt wird.

Die Methoden getWriter() und write()

Um in einen beschreibbaren Stream zu schreiben, benötigen Sie einen Writer, also eine WritableStreamDefaultWriter. Die getWriter()-Methode der WritableStream-Schnittstelle gibt eine neue Instanz von WritableStreamDefaultWriter zurück und sperrt den Stream für diese Instanz. Solange der Stream gesperrt ist, kann kein anderer Writer erworben werden, bis der aktuelle freigegeben wird.

Die write()-Methode der WritableStreamDefaultWriter-Schnittstelle schreibt einen übergebenen Datenblock in eine WritableStream und die zugrunde liegende Senke und gibt dann ein Versprechen zurück, das den Erfolg oder Misserfolg des Schreibvorgangs angibt. Was „Erfolg“ bedeutet, hängt vom zugrunde liegenden Sink ab. Es kann bedeuten, dass der Chunk akzeptiert wurde, nicht unbedingt, dass er sicher an seinem endgültigen Ziel gespeichert wurde.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Die Property locked

Sie können prüfen, ob ein beschreibbarer Stream gesperrt ist, indem Sie auf seine Eigenschaft WritableStream.locked zugreifen.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Codebeispiel für einen beschreibbaren Stream

Im folgenden Codebeispiel werden alle Schritte veranschaulicht.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Piping eines lesbaren Streams an einen beschreibbaren Stream

Ein lesbarer Stream kann über die Methode pipeTo() an einen beschreibbaren Stream weitergeleitet werden. ReadableStream.pipeTo() leitet die aktuelle ReadableStream an eine bestimmte WritableStream weiter und gibt ein Versprechen zurück, das erfüllt wird, wenn der Pipe-Vorgang erfolgreich abgeschlossen wurde, oder abgelehnt wird, wenn Fehler aufgetreten sind.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Transformationsstream erstellen

Die TransformStream-Schnittstelle der Streams API stellt eine Reihe von transformierbaren Daten dar. Sie erstellen einen Transformierungsstream, indem Sie den Konstruktor TransformStream() aufrufen. Dadurch wird ein Transformierungsstreamobjekt aus den angegebenen Handlers erstellt und zurückgegeben. Der Konstruktor TransformStream() akzeptiert als erstes Argument ein optionales JavaScript-Objekt, das die transformer darstellt. Solche Objekte können eine der folgenden Methoden enthalten:

transformer

  • start(controller): Diese Methode wird sofort nach dem Erstellen des Objekts aufgerufen. Normalerweise werden damit Präfix-Chunks mit controller.enqueue() in die Warteschlange gestellt. Diese werden von der lesbaren Seite gelesen, sind aber nicht von Schreibvorgängen auf der beschreibbaren Seite abhängig. Wenn dieser anfängliche Vorgang asynchron ist, z. B. weil es einige Mühe kostet, die Prefix-Chunks zu erhalten, kann die Funktion ein Versprechen zurückgeben, um Erfolg oder Misserfolg zu signalisieren. Ein abgelehntes Versprechen führt zu einem Fehler im Stream. Alle geworfenen Ausnahmen werden vom TransformStream()-Konstruktor noch einmal geworfen.
  • transform(chunk, controller): Diese Methode wird aufgerufen, wenn ein neuer Chunk, der ursprünglich auf die beschreibbare Seite geschrieben wurde, zur Transformation bereit ist. Die Streamimplementierung sorgt dafür, dass diese Funktion nur aufgerufen wird, nachdem die vorherigen Transformationen erfolgreich waren, und nie vor Abschluss von start() oder nach Aufruf von flush(). Diese Funktion führt die eigentliche Transformation des Transformstreams aus. Die Ergebnisse können mit controller.enqueue() in die Warteschlange gestellt werden. So kann ein einzelner Chunk, der auf die beschreibbare Seite geschrieben wird, zu null oder mehreren Chunks auf der lesbaren Seite führen, je nachdem, wie oft controller.enqueue() aufgerufen wird. Wenn die Transformation asynchron ist, kann diese Funktion ein Versprechen zurückgeben, um den Erfolg oder Misserfolg der Transformation anzuzeigen. Ein abgelehntes Versprechen führt zu Fehlern auf der lesbaren und der beschreibbaren Seite des Transformationsstreams. Wenn keine transform()-Methode angegeben ist, wird die Identitätstransformation verwendet, bei der die Blöcke unverändert von der beschreibbaren Seite in die lesbare Warteschlange gestellt werden.
  • flush(controller): Diese Methode wird aufgerufen, nachdem alle auf die beschreibbare Seite geschriebenen Chunks transformiert wurden, indem sie transform() erfolgreich durchlaufen haben, und die beschreibbare Seite kurz vor dem Schließen steht. Normalerweise werden damit Suffix-Chunks in der lesbaren Seite anstehen gelassen, bevor auch diese geschlossen wird. Wenn der Löschvorgang asynchron ist, kann die Funktion ein Versprechen zurückgeben, um Erfolg oder Misserfolg anzuzeigen. Das Ergebnis wird an den Aufrufer von stream.writable.write() gesendet. Außerdem führt ein abgelehntes Versprechen zu Fehlern sowohl auf der lesbaren als auch auf der beschreibbaren Seite des Streams. Das Auslösen einer Ausnahme wird genauso behandelt wie das Zurückgeben eines abgelehnten Versprechens.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Die Warteschlangenstrategien writableStrategy und readableStrategy

Die beiden optionalen Parameter des Konstruktors TransformStream() sind die optionalen writableStrategy- und readableStrategy-Warteschlangenstrategien. Sie werden wie in den Abschnitten readable (lesbar) und writable (schreibbar) beschrieben definiert.

Codebeispiel für die Transformation von Streams

Das folgende Codebeispiel zeigt einen einfachen Transformationsstream in Aktion.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Einen lesbaren Stream durch einen Transformationsstream leiten

Die Methode pipeThrough() der ReadableStream-Schnittstelle bietet eine verkettbare Möglichkeit, den aktuellen Stream durch einen Transformationsstream oder ein anderes Schreib-/Lesepaar zu leiten. Wenn ein Stream gepipet wird, wird er in der Regel für die Dauer der Pipe gesperrt, sodass andere Leser ihn nicht sperren können.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Das nächste Codebeispiel (etwas konstruiert) zeigt, wie Sie eine „Schreibmaschinen“-Version von fetch() implementieren können, bei der der gesamte Text in Großbuchstaben geschrieben wird, indem das zurückgegebene Antwortversprechen als Stream verwendet und der Text Chunk für Chunk in Großbuchstaben geschrieben wird. Der Vorteil dieses Ansatzes besteht darin, dass Sie nicht warten müssen, bis das gesamte Dokument heruntergeladen wurde. Das kann bei großen Dateien einen großen Unterschied machen.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

In der Demo unten werden lesbare, beschreibbare und transformierte Streams in Aktion gezeigt. Außerdem enthält es Beispiele für pipeThrough()- und pipeTo()-Pipechains sowie eine Demonstration von tee(). Sie können die Demo optional in einem eigenen Fenster ausführen oder sich den Quellcode ansehen.

Nützliche Streams, die im Browser verfügbar sind

Im Browser sind eine Reihe nützlicher Streams integriert. Sie können ganz einfach einen ReadableStream aus einem Blob erstellen. Die Methode stream() der Schnittstelle Blob gibt einen ReadableStream zurück, der beim Lesen die im Blob enthaltenen Daten zurückgibt. Denken Sie auch daran, dass ein File-Objekt eine bestimmte Art von Blob ist und in jedem Kontext verwendet werden kann, in dem auch ein Blob verwendet werden kann.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Die Streamingvarianten von TextDecoder.decode() und TextEncoder.encode() heißen TextDecoderStream bzw. TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Mit den Transformstreams CompressionStream und DecompressionStream können Sie Dateien ganz einfach komprimieren oder dekomprimieren. Im folgenden Codebeispiel wird gezeigt, wie du die Streams-Spezifikation herunterladen, direkt im Browser komprimieren (gzip) und die komprimierte Datei direkt auf die Festplatte schreiben kannst.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream der File System Access API und die experimentellen fetch()-Anfragestreams sind Beispiele für beschreibbare Streams in der Praxis.

Die Serial API nutzt sowohl lesbare als auch beschreibbare Streams.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Schließlich werden mit der WebSocketStream API Streams in die WebSocket API eingebunden.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Nützliches Infomaterial

Danksagungen

Dieser Artikel wurde von Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley und Adam Rice geprüft. Die Blogbeiträge von Jake Archibald haben mir sehr geholfen, Streams zu verstehen. Einige der Codebeispiele wurden von den Experimenten des GitHub-Nutzers @bellbind inspiriert. Teile des Texts basieren stark auf den MDN Web Docs zu Streams. Die Autoren des Streams-Standards haben hervorragende Arbeit bei der Erstellung dieser Spezifikation geleistet. Hero-Image von Ryan Lara auf Unsplash.