Streams – der ultimative Leitfaden

Informationen zum Lesen, Schreiben und Transformieren von Streams mit der Streams API

Mit der Streams API können Sie programmatisch auf Datenströme zugreifen, die über das Netzwerk empfangen oder lokal erstellt wurden, und sie mit JavaScript verarbeiten. Beim Streaming wird eine Ressource, die Sie empfangen, senden oder transformieren möchten, in kleine Blöcke zerlegt und dann bitweise verarbeitet. Streaming ist zwar eine Funktion, die Browser ohnehin nutzen, wenn sie Assets wie HTML oder Videos empfangen, die auf Webseiten angezeigt werden sollen, aber diese Funktion war für JavaScript nie verfügbar, bevor 2015 fetch mit Streams eingeführt wurde.

Wenn Sie früher eine Ressource (z. B. ein Video, eine Textdatei usw.) verarbeiten wollten, mussten Sie die gesamte Datei herunterladen, warten, bis sie in ein geeignetes Format deserialisiert wurde, und dann verarbeiten. Mit Streams, die für JavaScript verfügbar sind, ändert sich das. Sie können jetzt Rohdaten mit JavaScript schrittweise verarbeiten, sobald sie auf dem Client verfügbar sind, ohne einen Puffer, String oder Blob generieren zu müssen. Das eröffnet eine Reihe von Anwendungsfällen, von denen einige unten aufgeführt sind:

  • Videoeffekte: Ein lesbarer Videostream wird durch einen Transformierungsstream geleitet, der Effekte in Echtzeit anwendet.
  • Datenkomprimierung/-dekomprimierung: Ein Dateistream wird durch einen Transformationsstream geleitet, der ihn selektiv komprimiert bzw. dekomprimiert.
  • Bilddekodierung: Ein HTTP-Antwortstream wird durch einen Transformationsstream geleitet, der Bytes in Bitmap-Daten decodiert, und dann durch einen weiteren Transformationsstream, der Bitmaps in PNGs umwandelt. Wenn es im fetch-Handler eines Service Workers installiert ist, können Sie neue Bildformate wie AVIF transparent polyfillen.

Unterstützte Browser

ReadableStream und WritableStream

Unterstützte Browser

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Quelle

TransformStream

Unterstützte Browser

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Quelle

Wichtige Konzepte

Bevor ich auf die verschiedenen Arten von Streams eingehe, möchte ich einige grundlegende Konzepte vorstellen.

Chunks

Ein Chunk ist ein einzelnes Datenelement, das in einen Stream geschrieben oder daraus gelesen wird. Sie können beliebigen Typs sein und sogar Blöcke verschiedener Typen enthalten. In den meisten Fällen ist ein Block nicht die kleinste Dateneinheit für einen bestimmten Stream. Ein Bytestream kann beispielsweise Blöcke statt aus einzelnen Byte enthalten, die aus 16 KiB-Uint8Array-Einheiten bestehen.

Lesbare Streams

Ein lesbarer Stream ist eine Datenquelle, aus der Sie Daten lesen können. Mit anderen Worten: Daten fließen aus einem lesbaren Stream. Konkret ist ein lesbarer Stream eine Instanz der Klasse ReadableStream.

Beschreibbare Streams

Ein beschreibbarer Stream ist ein Ziel für Daten, in das Sie schreiben können. Mit anderen Worten, Daten werden in einen beschreibbaren Stream aufgenommen. Ein beschreibbarer Stream ist konkret eine Instanz der Klasse WritableStream.

Streams transformieren

Ein Transformierungsstream besteht aus einem Streampaar: einem beschreibbaren Stream, der als Schreibseite bezeichnet wird, und einem lesbaren Stream, der als Leseseite bezeichnet wird. Eine reale Metapher dafür wäre ein Simultanübersetzer, der spontan von einer Sprache in eine andere übersetzt. Wenn auf die beschreibbare Seite geschrieben wird, werden auf spezifische Weise für den Transformierungsstream neue Daten zum Lesen auf der lesbaren Seite verfügbar gemacht. Konkret kann jedes Objekt mit einer writable-Property und einer readable-Property als Transformierungsstream dienen. Mit der Standardklasse TransformStream ist es jedoch einfacher, ein solches Paar zu erstellen, das richtig verdrillt ist.

Rohrketten

Streams werden in erster Linie verwendet, indem sie per Piping aneinander gesendet werden. Ein lesbarer Stream kann mithilfe der Methode pipeTo() des lesbaren Streams direkt an einen beschreibbaren Stream übergeben werden. Alternativ kann er zuerst mit der Methode pipeThrough() des lesbaren Streams durch einen oder mehrere Transformationsstreams geleitet werden. Eine Reihe von Streams, die auf diese Weise zusammengeführt werden, wird als Pipe-Chain bezeichnet.

Gegendruck

Sobald eine Pipe-Kette erstellt wurde, werden Signale darüber übertragen, wie schnell Chunks durch sie fließen sollen. Wenn ein Schritt in der Kette noch keine Chunks akzeptieren kann, wird ein Signal rückwärts durch die Pipe-Kette gesendet, bis die ursprüngliche Quelle aufgefordert wird, nicht mehr so schnell Chunks zu generieren. Dieser Vorgang zur Normalisierung des Flusses wird als Gegendruck bezeichnet.

Abschlagen

Ein lesbarer Stream kann mithilfe der tee()-Methode geteed (benannt nach der Form eines großen „T“) werden. Dadurch wird der Stream gesperrt, d. h., er kann nicht mehr direkt verwendet werden. Es werden jedoch zwei neue Streams erstellt, sogenannte Branches, die unabhängig voneinander genutzt werden können. Das Teeing ist auch wichtig, weil Streams nicht zurückgespult oder neu gestartet werden können. Mehr dazu später.

Diagramm einer Pipe-Kette, die aus einem lesbaren Stream besteht, der von einem Aufruf der Fetch API stammt und dann durch einen Transformierungsstream geleitet wird, dessen Ausgabe aufgeteilt und dann an den Browser für den ersten resultierenden lesbaren Stream und an den Service Worker-Cache für den zweiten resultierenden lesbaren Stream gesendet wird.
Eine Pipe-Kette.

Funktionsweise eines lesbaren Streams

Ein lesbarer Stream ist eine Datenquelle, die in JavaScript durch ein ReadableStream-Objekt dargestellt wird, das aus einer zugrunde liegenden Quelle stammt. Der Konstruktor von ReadableStream() erstellt und gibt ein lesbares Streamobjekt aus den angegebenen Handlers zurück. Es gibt zwei Arten von zugrunde liegenden Quellen:

  • Push-Quellen senden Ihnen ständig Daten, wenn Sie darauf zugegriffen haben. Sie müssen den Zugriff auf den Stream starten, pausieren oder beenden. Beispiele hierfür sind Live-Videostreams, servergesendete Ereignisse oder WebSockets.
  • Bei Pull-Quellen müssen Sie Daten nach der Verbindung explizit anfordern. Beispiele hierfür sind HTTP-Vorgänge über fetch()- oder XMLHttpRequest-Aufrufe.

Streamdaten werden sequenziell in kleinen Teilen gelesen, die als Chunks bezeichnet werden. Die in einem Stream platzierten Chunks werden als in die Warteschlange gestellt bezeichnet. Das bedeutet, dass sie in einer Warteschlange warten und gelesen werden können. Eine interne Warteschlange überwacht die noch nicht gelesenen Chunks.

Eine Warteschlangenstrategie ist ein Objekt, das bestimmt, wie ein Stream Backpressure basierend auf dem Status seiner internen Warteschlange signalisieren soll. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht die Gesamtgröße aller Blöcke in der Warteschlange mit einer bestimmten Zahl, der sogenannten High-Water-Mark.

Die Chunks im Stream werden von einem Reader gelesen. Dieser Reader ruft die Daten Block nach dem anderen ab, sodass Sie beliebige Vorgänge damit ausführen können. Der Leser und der zugehörige Verarbeitungscode werden als Nutzer bezeichnet.

Das nächste Konstrukt in diesem Kontext wird als Controller bezeichnet. Jedem lesbaren Stream ist ein Controller zugeordnet, mit dem Sie den Stream steuern können.

Ein Stream kann immer nur von einem Leser gelesen werden. Wenn ein Leser erstellt wird und mit dem Lesen eines Streams beginnt (d. h. ein aktiver Leser wird), wird er für diesen Stream gesperrt. Wenn ein anderer Leser den Stream übernehmen soll, musst du in der Regel zuerst den ersten Leser freigeben, bevor du etwas anderes tun kannst. Du kannst aber auch Streams verknüpfen.

Lesbaren Stream erstellen

Sie erstellen einen lesbaren Stream, indem Sie seinen Konstruktor ReadableStream() aufrufen. Der Konstruktor hat ein optionales Argument underlyingSource, das ein Objekt mit Methoden und Eigenschaften darstellt, die das Verhalten der erstellten Streaminstanz definieren.

underlyingSource

Dazu können die folgenden optionalen, vom Entwickler definierten Methoden verwendet werden:

  • start(controller): Wird sofort bei der Erstellung des Objekts aufgerufen. Die Methode kann auf die Streamquelle zugreifen und alle anderen erforderlichen Schritte zur Einrichtung der Streamfunktion ausführen. Wenn dieser Vorgang asynchron erfolgen soll, kann die Methode ein Versprechen zurückgeben, um Erfolg oder Misserfolg zu signalisieren. Der an diese Methode übergebene Parameter controller ist ein ReadableStreamDefaultController.
  • pull(controller): Kann verwendet werden, um den Stream zu steuern, wenn mehr Blöcke abgerufen werden. Sie wird so lange wiederholt aufgerufen, bis die interne Chunk-Warteschlange des Streams voll ist. Wenn das Ergebnis des Aufrufs von pull() ein Promise ist, wird pull() erst dann wieder aufgerufen, wenn das Promise erfüllt ist. Wenn das Versprechen abgelehnt wird, wird der Stream fehlerhaft.
  • cancel(reason): Wird aufgerufen, wenn der Stream-Nutzer den Stream abbricht.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController unterstützt die folgenden Methoden:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Das zweite, ebenfalls optionale Argument des ReadableStream()-Konstruktors ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Es nimmt zwei Parameter an:

  • highWaterMark: Eine positive Zahl, die den Höchststand des Streams bei Verwendung dieser Warteschlangenstrategie angibt.
  • size(chunk): Eine Funktion, die die endliche, nicht negative Größe des angegebenen Chunk-Werts berechnet und zurückgibt. Das Ergebnis wird verwendet, um den Backpressure zu bestimmen, der über die entsprechende ReadableStreamDefaultController.desiredSize-Property angezeigt wird. Außerdem wird damit festgelegt, wann die pull()-Methode der zugrunde liegenden Quelle aufgerufen wird.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Die Methoden getReader() und read()

Um aus einem lesbaren Stream zu lesen, benötigen Sie einen Leser, also eine ReadableStreamDefaultReader. Die Methode getReader() der ReadableStream-Schnittstelle erstellt einen Leser und sperrt den Stream dafür. Solange der Stream gesperrt ist, kann kein anderer Leser darauf zugreifen, bis dieser freigegeben wird.

Die Methode read() der ReadableStreamDefaultReader-Schnittstelle gibt ein Versprechen zurück, das Zugriff auf den nächsten Chunk in der internen Warteschlange des Streams gewährt. Je nach Status des Streams wird die Anfrage erfüllt oder abgelehnt. Es gibt folgende Möglichkeiten:

  • Wenn ein Chunk verfügbar ist, wird das Versprechen mit einem Objekt der Form
    { value: chunk, done: false } erfüllt.
  • Wenn der Stream geschlossen wird, wird das Promise mit einem Objekt vom Typ
    { value: undefined, done: true } erfüllt.
  • Wenn beim Stream ein Fehler auftritt, wird das Versprechen mit dem entsprechenden Fehler abgelehnt.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Das Attribut locked

Sie können prüfen, ob ein lesbarer Stream gesperrt ist, indem Sie auf seine Eigenschaft ReadableStream.locked zugreifen.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Lesbare Stream-Codebeispiele

Im folgenden Codebeispiel werden alle Schritte veranschaulicht. Sie erstellen zuerst ein ReadableStream, das in seinem underlyingSource-Argument (d. h. der Klasse TimestampSource) eine start()-Methode definiert. Mit dieser Methode wird der controller des Streams angewiesen, alle zehn Sekunden einen Zeitstempel zu enqueue(). Schließlich wird der Controller angewiesen, den Stream close(). Zum Verbrauchen dieses Streams erstellen Sie mit der Methode getReader() einen Leser und rufen read() auf, bis der Stream done ist.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Asynchrone Iteration

Es ist nicht die praktischste API, bei jeder read()-Schleifeniteration zu prüfen, ob der Stream done ist. Glücklicherweise gibt es dazu bald eine bessere Methode: die asynchrone Iteration.

for await (const chunk of stream) {
  console.log(chunk);
}

Eine Möglichkeit, asynchrone Iterationen zu verwenden, besteht darin, das Verhalten mit einer Polyfill zu implementieren.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Lesbaren Stream erstellen

Die Methode tee() der ReadableStream-Schnittstelle teilt den aktuellen lesbaren Stream und gibt ein Array mit zwei Elementen zurück, das die beiden resultierenden Verzweigungen als neue ReadableStream-Instanzen enthält. So können zwei Leser einen Stream gleichzeitig lesen. Sie können dies beispielsweise in einem Service Worker tun, wenn Sie eine Antwort vom Server abrufen und an den Browser streamen, aber auch an den Service Worker-Cache. Da ein Antworttext nicht mehrmals verwendet werden kann, benötigen Sie dazu zwei Kopien. Zum Abbrechen des Streams müssen Sie dann beide resultierenden Zweige abbrechen. Wenn Sie einen Stream trennen, wird er in der Regel für die Dauer der Trennung gesperrt, sodass andere Leser ihn nicht sperren können.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Lesbare Bytestreams

Für Streams, die Bytes darstellen, wird eine erweiterte Version des lesbaren Streams bereitgestellt, um Bytes effizient zu verarbeiten, insbesondere durch Minimierung von Kopien. Mit Bytestreams können Lesegeräte mit eigenem Puffer (Bring Your Own Buffer, BYOB) erworben werden. Die Standardimplementierung kann eine Reihe verschiedener Ausgaben liefern, wie Strings oder Array-Zwischenspeicher im Fall von WebSockets, während Bytestreams die Byte-Ausgabe garantieren. Außerdem bieten BYOB-Lesegeräte Vorteile in Bezug auf die Stabilität. Wenn ein Puffer getrennt wird, kann sichergestellt werden, dass nicht zweimal in denselben Puffer geschrieben wird, wodurch Race-Bedingungen vermieden werden. BYOB-Leser können die Häufigkeit reduzieren, mit der der Browser die Garbage Collection ausführen muss, da Puffer wiederverwendet werden können.

Lesbaren Bytestream erstellen

Sie können einen lesbaren Bytestream erstellen, indem Sie dem Konstruktor von ReadableStream() einen zusätzlichen type-Parameter übergeben.

new ReadableStream({ type: 'bytes' });

underlyingSource

Die zugrunde liegende Quelle eines lesbaren Byte-Streams erhält eine ReadableByteStreamController, die manipuliert werden kann. Die Methode ReadableByteStreamController.enqueue() verwendet ein chunk-Argument, dessen Wert ein ArrayBufferView ist. Das Attribut ReadableByteStreamController.byobRequest gibt die aktuelle BYOB-Pull-Anfrage oder null zurück, wenn keine vorhanden ist. Die Eigenschaft ReadableByteStreamController.desiredSize gibt schließlich die gewünschte Größe zurück, um die interne Warteschlange des gesteuerten Streams zu füllen.

queuingStrategy

Das zweite, ebenfalls optionale Argument des Konstruktors ReadableStream() ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Es nimmt einen Parameter an:

  • highWaterMark: Eine nicht negative Anzahl von Byte, die den Höchststand des Streams bei Verwendung dieser Warteschlangenstrategie angibt. Er wird verwendet, um den Rückstand zu bestimmen, der sich über das entsprechende ReadableByteStreamController.desiredSize-Attribut auswirkt. Außerdem wird damit festgelegt, wann die pull()-Methode der zugrunde liegenden Quelle aufgerufen wird.

Die Methoden getReader() und read()

Sie können dann auf eine ReadableStreamBYOBReader zugreifen, indem Sie den Parameter mode entsprechend festlegen: ReadableStream.getReader({ mode: "byob" }). So lässt sich die Pufferzuweisung genauer steuern, um Kopien zu vermeiden. Wenn Sie aus dem Byte-Stream lesen möchten, müssen Sie ReadableStreamBYOBReader.read(view) aufrufen, wobei view ein ArrayBufferView ist.

Codebeispiel für lesbaren Bytestream

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Die folgende Funktion gibt lesbare Bytestreams zurück, die ein effizientes Lesen eines zufällig generierten Arrays ohne Kopiervorgang ermöglichen. Anstatt eine vordefinierte Blockgröße von 1.024 zu verwenden, wird versucht, den vom Entwickler bereitgestellten Puffer zu füllen, was eine vollständige Kontrolle ermöglicht.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Funktionsweise eines beschreibbaren Streams

Ein beschreibbarer Stream ist ein Ziel, in das Sie Daten schreiben können, die in JavaScript durch ein WritableStream-Objekt dargestellt werden. Dies dient als Abstraktion über einem untergeordneten Sink, einem I/O-Sink auf niedrigerer Ebene, in den Rohdaten geschrieben werden.

Die Daten werden über einen Writer nach und nach in den Stream geschrieben. Ein Chunk kann viele Formen annehmen, wie die Teile in einem Reader. Sie können jeden beliebigen Code verwenden, um die Blöcke zu erstellen, die zum Schreiben bereit sind. Der Autor und der zugehörige Code werden als Producer bezeichnet.

Wenn ein Writer erstellt wird und mit dem Schreiben in einen Stream beginnt (ein aktiver Writer), wird er für diesen Stream gesperrt. In einen beschreibbaren Stream kann jeweils nur ein Writer schreiben. Wenn ein anderer Autor mit dem Schreiben in Ihren Stream beginnen soll, müssen Sie ihn in der Regel freigeben, bevor Sie ihm einen weiteren Autor hinzufügen.

Eine interne Warteschlange verfolgt die Blöcke, die in den Stream geschrieben, aber noch nicht von der zugrunde liegenden Senke verarbeitet wurden.

Eine Warteschlangenstrategie ist ein Objekt, das bestimmt, wie ein Stream Backpressure basierend auf dem Status seiner internen Warteschlange signalisieren soll. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht die Gesamtgröße aller Blöcke in der Warteschlange mit einer bestimmten Zahl, der sogenannten High-Water-Mark.

Das endgültige Konstrukt wird als Controller bezeichnet. Jedem beschreibbaren Stream ist ein Controller zugeordnet, mit dem Sie den Stream steuern (z. B. abbrechen).

Schreibbaren Stream erstellen

Die WritableStream-Schnittstelle der Streams API bietet eine Standardabstraktion zum Schreiben von Streamingdaten in ein Ziel, auch als Senke bezeichnet. Dieses Objekt bietet integrierte Rückstau- und Warteschlangenfunktionen. Sie erstellen einen beschreibbaren Stream, indem Sie seinen Konstruktor WritableStream() aufrufen. Es hat einen optionalen Parameter underlyingSink, der ein Objekt mit Methoden und Eigenschaften darstellt, die das Verhalten der erstellten Streaminstanz definieren.

underlyingSink

Die underlyingSink kann die folgenden optionalen, vom Entwickler definierten Methoden enthalten. Der controller-Parameter, der an einige der Methoden übergeben wird, ist ein WritableStreamDefaultController.

  • start(controller): Diese Methode wird sofort beim Erstellen des Objekts aufgerufen. Der Inhalt dieser Methode sollte darauf abzielen, Zugriff auf den zugrunde liegenden Datenablauf zu erhalten. Wenn dieser Vorgang asynchron erfolgen soll, kann ein Versprechen zurückgegeben werden, um Erfolg oder Misserfolg anzuzeigen.
  • write(chunk, controller): Diese Methode wird aufgerufen, wenn ein neuer Datenblock (im Parameter chunk angegeben) in den zugrunde liegenden Datenablauf geschrieben werden kann. Sie kann ein Versprechen zurückgeben, um den Erfolg oder Misserfolg des Schreibvorgangs anzuzeigen. Diese Methode wird nur aufgerufen, nachdem vorherige Schreibvorgänge erfolgreich waren, und nie, nachdem der Stream geschlossen oder abgebrochen wurde.
  • close(controller): Diese Methode wird aufgerufen, wenn die App signalisiert, dass das Schreiben von Blöcken in den Stream abgeschlossen ist. Der Inhalt sollte alles Notwendige tun, um die Schreibvorgänge in der zugrunde liegenden Senke abzuschließen und den Zugriff darauf freizugeben. Wenn dieser Vorgang asynchron ist, kann er ein Versprechen zurückgeben, um Erfolg oder Fehlschlag zu signalisieren. Diese Methode wird erst aufgerufen, nachdem alle in der Warteschlange befindlichen Schreibvorgänge erfolgreich waren.
  • abort(reason): Diese Methode wird aufgerufen, wenn die App signalisiert, dass sie den Stream abrupt schließen und in einen Fehlerstatus versetzen möchte. Damit können alle aufgehaltenen Ressourcen bereinigt werden, ähnlich wie bei close(), aber abort() wird aufgerufen, auch wenn Schreibvorgänge sich in der Warteschlange befinden. Diese Teile werden weggeworfen. Wenn dieser Vorgang asynchron ist, kann er ein Versprechen zurückgeben, um Erfolg oder Fehlschlag zu signalisieren. Der Parameter reason enthält einen DOMString, der beschreibt, warum der Stream abgebrochen wurde.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Die WritableStreamDefaultController-Schnittstelle der Streams API stellt einen Controller dar, mit dem der Status eines WritableStream während der Einrichtung gesteuert werden kann, wenn weitere Chunks zum Schreiben gesendet werden, oder am Ende des Schreibens. Beim Erstellen einer WritableStream wird dem zugrunde liegenden Sink eine entsprechende WritableStreamDefaultController-Instanz zum Manipulieren zugewiesen. Die WritableStreamDefaultController hat nur eine Methode: WritableStreamDefaultController.error(). Dies führt dazu, dass zukünftige Interaktionen mit dem zugehörigen Stream zu Fehlern führen. WritableStreamDefaultController unterstützt auch ein signal-Attribut, das eine Instanz von AbortSignal zurückgibt, sodass ein WritableStream-Vorgang bei Bedarf beendet werden kann.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Das zweite, ebenfalls optionale Argument des Konstruktors WritableStream() ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Hierfür sind zwei Parameter erforderlich:

  • highWaterMark: Eine positive Zahl, die den Höchststand des Streams bei Verwendung dieser Warteschlangenstrategie angibt.
  • size(chunk): Eine Funktion, die die endliche, nicht negative Größe des angegebenen Chunk-Werts berechnet und zurückgibt. Das Ergebnis wird verwendet, um den Rückstand zu bestimmen, der sich über das entsprechende WritableStreamDefaultWriter.desiredSize-Attribut zeigt.

Die Methoden getWriter() und write()

Zum Schreiben in einen beschreibbaren Stream benötigen Sie einen Schreiber. Dies ist eine WritableStreamDefaultWriter. Die getWriter()-Methode der WritableStream-Schnittstelle gibt eine neue Instanz von WritableStreamDefaultWriter zurück und sperrt den Stream für diese Instanz. Während der Stream gesperrt ist, kann kein anderer Autor abgerufen werden, bis der aktuelle Stream freigegeben wird.

Die Methode write() der Schnittstelle WritableStreamDefaultWriter schreibt einen übergebenen Datenblock in eine WritableStream und den zugehörigen Datenablauf und gibt dann ein Versprechen zurück, das den Erfolg oder Misserfolg des Schreibvorgangs angibt. Was "Erfolg" bedeutet, hängt von der zugrunde liegenden Senke ab. Es kann darauf hinweisen, dass der Block akzeptiert wurde, und nicht unbedingt, dass er sicher an seinem endgültigen Ziel gespeichert wurde.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Das locked-Attribut

Sie können prüfen, ob ein beschreibbarer Stream gesperrt ist. Dazu rufen Sie sein Attribut WritableStream.locked auf.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Codebeispiel für einen beschreibbaren Stream

Im folgenden Codebeispiel werden alle Schritte veranschaulicht.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Lesestream an einen Schreibstream weiterleiten

Ein lesbarer Stream kann mit der Methode pipeTo() des lesbaren Streams an einen beschreibbaren Stream weitergeleitet werden. ReadableStream.pipeTo() leitet die aktuelle ReadableStream an eine bestimmte WritableStream weiter und gibt ein Versprechen zurück, das erfüllt wird, wenn der Pipe-Vorgang erfolgreich abgeschlossen wurde, oder abgelehnt wird, wenn Fehler aufgetreten sind.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Transformationsstream erstellen

Die TransformStream-Schnittstelle der Streams API stellt eine Reihe von transformierbaren Daten dar. Sie erstellen einen Transformierungsstream, indem Sie den Konstruktor TransformStream() aufrufen. Dieser erstellt und gibt ein Transformierungsstreamobjekt aus den angegebenen Handlers zurück. Der TransformStream()-Konstruktor akzeptiert als erstes Argument ein optionales JavaScript-Objekt, das transformer darstellt. Solche Objekte können eine der folgenden Methoden enthalten:

transformer

  • start(controller): Diese Methode wird sofort aufgerufen, wenn das Objekt erstellt wird. In der Regel wird dies verwendet, um Präfixblöcke mit controller.enqueue() in die Warteschlange zu stellen. Diese werden von der lesbaren Seite gelesen, sind aber nicht von Schreibvorgängen auf der beschreibbaren Seite abhängig. Wenn dieser anfängliche Vorgang asynchron ist, z. B. weil es einige Mühe kostet, die Prefix-Chunks zu erhalten, kann die Funktion ein Versprechen zurückgeben, um Erfolg oder Misserfolg zu signalisieren. Ein abgelehntes Versprechen führt zu einem Fehler im Stream. Alle geworfenen Ausnahmen werden vom TransformStream()-Konstruktor noch einmal geworfen.
  • transform(chunk, controller): Diese Methode wird aufgerufen, wenn ein neuer Chunk, der ursprünglich auf die beschreibbare Seite geschrieben wurde, zur Transformation bereit ist. Die Streamimplementierung sorgt dafür, dass diese Funktion nur aufgerufen wird, nachdem vorherige Transformationen erfolgreich waren, und nie, bevor start() abgeschlossen ist oder flush() aufgerufen wurde. Diese Funktion führt die eigentliche Transformation des Transformstreams aus. Die Ergebnisse können mit controller.enqueue() in die Warteschlange eingereiht werden. So kann ein einzelner Chunk, der auf die beschreibbare Seite geschrieben wird, zu null oder mehreren Chunks auf der lesbaren Seite führen, je nachdem, wie oft controller.enqueue() aufgerufen wird. Wenn der Transformationsprozess asynchron ist, kann diese Funktion eine Versprechung zurückgeben, um den Erfolg oder Misserfolg der Transformation zu signalisieren. Ein abgelehntes Promise führt sowohl auf der lesbaren als auch auf der beschreibbaren Seite des Transformationsstreams zu Fehlern. Wenn keine transform()-Methode angegeben ist, wird die Identitätstransformation verwendet, bei der die Blöcke unverändert von der beschreibbaren Seite in die lesbare Warteschlange gestellt werden.
  • flush(controller): Diese Methode wird aufgerufen, nachdem alle auf die beschreibbare Seite geschriebenen Chunks transformiert wurden, indem sie transform() erfolgreich durchlaufen haben, und die beschreibbare Seite geschlossen werden soll. Normalerweise werden damit Suffix-Chunks in der lesbaren Seite anstehen gelassen, bevor auch diese geschlossen wird. Wenn der Löschvorgang asynchron ist, kann die Funktion ein Versprechen zurückgeben, um Erfolg oder Misserfolg anzuzeigen. Das Ergebnis wird an den Aufrufer von stream.writable.write() gesendet. Außerdem führt ein abgelehntes Versprechen zu Fehlern auf der lesbaren und der beschreibbaren Seite des Streams. Das Auslösen einer Ausnahme wird genauso behandelt wie die Rückgabe einer abgelehnten Versprechen.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Die Warteschlangenstrategien writableStrategy und readableStrategy

Die optionalen Parameter 2 und 3 des Konstruktors TransformStream() sind die optionalen writableStrategy- und readableStrategy-Warteschlangenstrategien. Sie sind so definiert, wie in den Streamabschnitten lesbar und beschreibbar beschrieben.

Codebeispiel für die Streamtransformation

Das folgende Codebeispiel zeigt einen einfachen Transformationsstream in Aktion.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Einen lesbaren Stream durch einen Transformationsstream leiten

Die Methode pipeThrough() der ReadableStream-Schnittstelle bietet eine verkettbare Möglichkeit, den aktuellen Stream durch einen Transformationsstream oder ein anderes Schreib-/Lesepaar zu leiten. Wenn ein Stream gepipet wird, wird er in der Regel für die Dauer der Pipe gesperrt, sodass andere Leser ihn nicht sperren können.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Das nächste Codebeispiel (etwas konstruiert) zeigt, wie Sie eine „Schreibmaschinen“-Version von fetch() implementieren können, bei der der gesamte Text in Großbuchstaben geschrieben wird, indem das zurückgegebene Antwortversprechen als Stream verwendet und der Text nach und nach in Großbuchstaben geschrieben wird. Der Vorteil dieses Ansatzes besteht darin, dass Sie nicht warten müssen, bis das gesamte Dokument heruntergeladen wurde. Das kann bei großen Dateien einen großen Unterschied machen.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

In der Demo unten werden lesbare, beschreibbare und transformierte Streams in Aktion gezeigt. Sie enthält auch Beispiele für pipeThrough()- und pipeTo()-Rohrketten und veranschaulicht tee(). Sie können die Demo optional in einem eigenen Fenster ausführen oder sich den Quellcode ansehen.

Nützliche Streams im Browser

Im Browser sind eine Reihe nützlicher Streams integriert. Sie können ganz einfach ein ReadableStream aus einem Blob erstellen. Die Methode stream() der Schnittstelle Blob gibt einen ReadableStream zurück, der beim Lesen die im Blob enthaltenen Daten zurückgibt. Ein File-Objekt ist eine bestimmte Art von Blob und kann in jedem Kontext verwendet werden, den ein Blob bietet.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Die Streamingvarianten von TextDecoder.decode() und TextEncoder.encode() heißen TextDecoderStream bzw. TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Die Komprimierung oder Dekomprimierung einer Datei ist mit den Transformationsstreams CompressionStream und DecompressionStream einfach. Im folgenden Codebeispiel wird gezeigt, wie du die Streams-Spezifikation herunterladen, direkt im Browser komprimieren (gzip) und die komprimierte Datei direkt auf die Festplatte schreiben kannst.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream der File System Access API und die experimentellen fetch()-Anfragestreams sind Beispiele für beschreibbare Streams in der Praxis.

In der Serial API werden sowohl lesbare als auch beschreibbare Streams häufig verwendet.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Schließlich werden mit der WebSocketStream API Streams in die WebSocket API eingebunden.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Nützliches Infomaterial

Danksagungen

Dieser Artikel wurde von Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley und Adam Rice gelesen. Die Blogbeiträge von Jake Archibald haben mir sehr geholfen, Streams zu verstehen. Einige der Codebeispiele wurden von den Experimenten des GitHub-Nutzers @bellbind inspiriert. Teile des Texts basieren stark auf den MDN Web Docs zu Streams. Die Autoren des Streams-Standards haben hervorragende Arbeit bei der Erstellung dieser Spezifikation geleistet. Hero-Image von Ryan Lara auf Unsplash.