Streams – der ultimative Leitfaden

Lesen Sie, wie Sie lesbare, beschreibbare und transformierte Streams mit der Streams API verwenden.

Mit der Streams API können Sie programmatisch auf Datenstreams zugreifen, die über das Netzwerk empfangen wurden. oder auf irgendeine Weise lokal erstellt werden, mit JavaScript verarbeiten können. Beim Streaming wird eine Ressource aufgegliedert, die Sie empfangen, senden oder transformieren möchten. in kleine Blöcke und verarbeitet sie dann Stück für Stück. Beim Streaming geht es darum, wenn sie Assets wie HTML oder Videos empfangen, die auf Webseiten angezeigt werden sollen, Die Funktion war für JavaScript erst verfügbar, nachdem fetch mit Streams 2015 eingeführt wurde.

Wenn Sie früher eine Ressource (z. B. ein Video, eine Textdatei usw.) verarbeiten wollten, müssten Sie die gesamte Datei herunterladen, warten, bis sie in ein geeignetes Format deserialisiert wurde, und dann zu verarbeiten. Streams sind für alle JavaScript ausführen, ändert sich alles. Sie können Rohdaten mit JavaScript schrittweise verarbeiten, sobald sie auf dem Client verfügbar sind, ohne einen Puffer, einen String oder ein Blob generieren zu müssen. Dadurch ergeben sich für Sie eine Reihe von Anwendungsfällen, von denen ich einige unten auflistee:

  • Videoeffekte:Wird ein lesbarer Videostream durch einen Transformationsstream geleitet, bei dem Effekte angewendet werden. in Echtzeit.
  • Daten- bzw. De-komprimierung:Weiterleitung eines Dateistreams durch einen Transformationsstream, der selektiv (de)komprimiert.
  • Bilddecodierung:Weiterleitung eines HTTP-Antwortstreams durch einen Transformationsstream, der Byte decodiert in Bitmapdaten und dann über einen weiteren Transformationsstream, der Bitmaps in PNGs umwandelt. Wenn die im fetch-Handler eines Service Workers installiert sind, können Sie neue Bildformate wie AVIF.

Unterstützte Browser

ReadableStream und WritableStream

Unterstützte Browser

  • Chrome: 43. <ph type="x-smartling-placeholder">
  • Edge: 14. <ph type="x-smartling-placeholder">
  • Firefox: 65 <ph type="x-smartling-placeholder">
  • Safari: 10.1 <ph type="x-smartling-placeholder">

Quelle

TransformStream

Unterstützte Browser

  • Chrome: 67. <ph type="x-smartling-placeholder">
  • Edge: 79. <ph type="x-smartling-placeholder">
  • Firefox: 102 <ph type="x-smartling-placeholder">
  • Safari: 14.1 <ph type="x-smartling-placeholder">

Quelle

Wichtige Konzepte

Bevor ich die verschiedenen Arten von Streams näher beschreibe, möchte ich einige Kernkonzepte erläutern.

Klötze

Ein Chunk ist ein einzelnes Datenelement, das in einen Stream geschrieben oder aus diesem gelesen wird. Dabei kann es sich um Typ; Streams können sogar Blöcke verschiedener Typen enthalten. Meistens ist ein Chunk nicht besonders atomar Dateneinheit für einen bestimmten Stream. Ein Bytestream kann beispielsweise Blöcke enthalten, die aus 16 KiB-Uint8Array-Einheiten anstelle von einzelnen Byte.

Lesbare Streams

Ein lesbarer Stream stellt eine Datenquelle dar, aus der gelesen werden kann. Mit anderen Worten, Daten kommen aus einem lesbaren Stream. Konkret ist ein lesbarer Stream eine Instanz von ReadableStream. .

Beschreibbare Streams

Ein beschreibbarer Stream stellt ein Ziel für Daten dar, in das geschrieben werden kann. Mit anderen Worten, Daten wird in einen schreibbaren Stream geladen. Ein beschreibbarer Stream ist eine Instanz des Klasse WritableStream.

Streams transformieren

Ein Transformationsstream besteht aus einem Paar von Streams: einem beschreibbaren Stream, der auch als beschreibbare Seite bezeichnet wird. und einem lesbaren Stream, der als lesbare Seite bezeichnet wird. Eine Metapher hierfür wäre Simultandolmetscher der spontan von einer Sprache in eine andere übersetzt. In einer für den Transformationsstream spezifischen Weise führt dazu, dass neue Daten zum Lesen Seite gut lesbar ist. Genauer gesagt kann jedes Objekt mit einer writable- und readable-Eigenschaft als Transformationsstream. Mit der Standardklasse TransformStream ist es jedoch einfacher, ein sogenanntes Paar.

Rohrketten

Streams werden in erster Linie verwendet, indem sie über Pipes aneinander gesendet werden. An einen lesbaren Stream kann direkt weitergeleitet werden mit der Methode pipeTo() des lesbaren Streams in einen schreibbaren Stream übertragen oder durch eine Pipeline an einen schreibbaren Stream geleitet werden. oder mehr Transformationsstreams zuerst mit der Methode pipeThrough() des lesbaren Streams. Eine Reihe von auf diese Weise miteinander verbundene Ströme, wird als Rohrkette bezeichnet.

Rückdruck

Sobald eine Pipe-Kette erstellt ist, leitet sie Signale weiter, die angeben, wie schnell Blöcke fließen sollen. durch sie hindurch. Wenn ein Schritt in der Kette noch keine Blöcke akzeptieren kann, wird das Signal rückwärts weitergeleitet. durch die Rohrkette verlaufen, bis die ursprüngliche Quelle schließlich aufgefordert wird, die Produktion von Stücken einzustellen, schnell. Dieser Prozess der Normalisierung des Flusses wird als Rückdruck bezeichnet.

Abschlag

Ein lesbarer Stream kann mithilfe der Methode tee() über ein T-Shirt verknüpft werden, das nach der Form eines Großbuchstabens „T“ benannt ist. Dadurch wird der Stream gesperrt, er kann also nicht mehr direkt verwendet werden. Es werden jedoch zwei neue Streams, sogenannte Zweige, die unabhängig verarbeitet werden können. Teeing ist auch wichtig, weil Streams nicht zurückgespult oder neu gestartet werden können. Mehr dazu später.

<ph type="x-smartling-placeholder">
</ph> Diagramm einer Pipe-Kette, bestehend aus einem lesbaren Stream, der von einem Aufruf an die Fetch API eingeht, die dann durch einen Transformationsstream geleitet wird, dessen Ausgabe ein T-Shirt ist, und dann für den ersten resultierenden lesbaren Stream an den Browser und für den zweiten resultierenden lesbaren Stream an den Service Worker-Cache gesendet wird.
Eine Rohrkette.

Funktionsweise eines lesbaren Streams

Ein lesbarer Stream ist eine Datenquelle, die in JavaScript durch ein ReadableStream-Objekt, das aus einer zugrunde liegenden Quelle. Die ReadableStream() -Konstruktor erstellt und gibt ein lesbares Streamobjekt aus den angegebenen Handlern zurück. Es gibt zwei Typen der zugrunde liegenden Quelle:

  • Push-Quellen senden ständig Daten an Sie, wenn Sie darauf zugreifen, und es liegt an Ihnen, dies zu tun den Zugriff auf den Stream starten, pausieren oder abbrechen. Beispiele hierfür sind Live-Videostreams, vom Server gesendete Ereignisse, oder WebSockets.
  • Bei Pull-Quellen müssen Sie explizit Daten von ihnen anfordern, sobald eine Verbindung besteht. Beispiele HTTP-Vorgänge über fetch()- oder XMLHttpRequest-Aufrufe einschließen.

Streamdaten werden sequenziell in kleinen Abschnitten gelesen, die als Blöcke bezeichnet werden. Die in einem Stream platzierten Blöcke gelten als in die Warteschlange. Das bedeutet, dass sie in einer Warteschlange warten. und kann gelesen werden. Eine interne Warteschlange verfolgt die Blöcke, die noch nicht gelesen wurden.

Eine Warteschlangenstrategie ist ein Objekt, das festlegt, wie ein Stream den Gegendruck auf der Grundlage den Status seiner internen Warteschlange. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht Gesamtgröße aller Blöcke in der Warteschlange auf eine angegebene Zahl, die als Hochwassermarkierung bezeichnet wird.

Die Blöcke im Stream werden von einem Leser gelesen. Dieser Reader ruft die Daten Stück für Stück sodass Sie genau die Art von Operation ausführen können, die Sie damit durchführen möchten. Der Leser und die anderen zugehörigen Verarbeitungscode wird als Consumer bezeichnet.

Das nächste Konstrukt wird in diesem Zusammenhang als Controller bezeichnet. Jedem lesbaren Stream ist ein Controller, mit dem du, wie der Name schon sagt, den Stream steuern kannst.

Ein Stream kann jeweils nur von einem Leser gelesen werden. Ein Leser wird erstellt und beginnt, einen Stream zu lesen. also zu einem aktiven Leser wird, ist er gesperrt. Wenn Sie möchten, dass ein anderer Leser Wenn Sie Ihren Stream lesen, müssen Sie in der Regel das erste Lesegerät freigeben, bevor Sie etwas anderes tun. (Du kannst aber einen Tee mit Stream starten).

Lesbaren Stream erstellen

Sie erstellen einen lesbaren Stream, indem Sie dessen Konstruktor aufrufen. ReadableStream() Der Konstruktor hat das optionale Argument underlyingSource, das ein Objekt darstellt. mit Methoden und Attributen, die das Verhalten der erstellten Streaminstanz definieren.

underlyingSource

Dazu können die folgenden optionalen, vom Entwickler definierten Methoden verwendet werden:

  • start(controller): Wird sofort bei der Erstellung des Objekts aufgerufen. Die kann auf die Streamquelle zugreifen und weitere Aktionen ausführen. die für die Einrichtung der Streamfunktion erforderlich sind. Wenn dieser Vorgang asynchron durchgeführt werden soll, kann die Methode ein Versprechen zurückgeben, das Erfolg oder Misserfolg signalisiert. Der an diese Methode übergebene controller-Parameter ist eine ReadableStreamDefaultController
  • pull(controller): Kann verwendet werden, um den Stream zu steuern, wenn mehr Blöcke abgerufen werden. Es wird wiederholt aufgerufen, solange die interne Warteschlange des Streams mit Chunks nicht voll ist, bis die Warteschlange erreicht sein Hochwasser. Wenn das Ergebnis des Aufrufs von pull() ein Promise ist, pull() wird erst dann wieder angerufen, wenn das Versprechen erfüllt ist. Wenn das Versprechen abgelehnt wird, wird für den Stream ein Fehler ausgegeben.
  • cancel(reason): Wird aufgerufen, wenn der Stream-Nutzer den Stream abbricht.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController unterstützt die folgenden Methoden:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Das zweite, ebenfalls optionale Argument des ReadableStream()-Konstruktors ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert, die zwei Parameter:

  • highWaterMark: Eine nicht negative Zahl, die das Hochwasser des Flusses angibt, für das diese Warteschlangenstrategie verwendet wird.
  • size(chunk): Eine Funktion, die die endliche nicht negative Größe des gegebenen Chunk-Werts berechnet und zurückgibt. Das Ergebnis wird verwendet, um den Rückstand zu bestimmen, der sich über das entsprechende ReadableStreamDefaultController.desiredSize-Attribut zeigt. Sie bestimmt auch, wann die Methode pull() der zugrunde liegenden Quelle aufgerufen wird.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Die Methoden getReader() und read()

Zum Lesen aus einem lesbaren Stream benötigen Sie einen Reader. ReadableStreamDefaultReader Die Methode getReader() der ReadableStream-Schnittstelle erstellt einen Leser und sperrt den Stream für . Solange der Stream gesperrt ist, kann kein weiterer Leser erworben werden, bis dieser übertragen wird.

Die read() der ReadableStreamDefaultReader-Schnittstelle ein Promise zurückgibt, das Zugriff auf das nächste in die interne Warteschlange des Streams ein. Sie erfüllt oder lehnt ein Ergebnis ab, je nachdem, in den Stream aufnehmen. Es gibt folgende Möglichkeiten:

  • Wenn ein Chunk verfügbar ist, wird das Versprechen mit einem Objekt der Form erfüllt
    { value: chunk, done: false }.
  • Wenn der Stream geschlossen wird, wird das Versprechen mit einem Objekt im folgenden Format erfüllt:
    { value: undefined, done: true }.
  • Wenn beim Stream ein Fehler auftritt, wird das Promise mit dem entsprechenden Fehler abgelehnt.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Das Attribut locked

Sie können überprüfen, ob ein lesbarer Stream gesperrt ist, indem Sie auf die ReadableStream.locked Property.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Lesbare Stream-Codebeispiele

Das Codebeispiel unten zeigt alle Schritte in Aktion. Sie erstellen zuerst ein ReadableStream, das in seiner Argument underlyingSource (d. h. die Klasse TimestampSource) definiert eine start()-Methode. Mit dieser Methode wird dem controller des Streams mitgeteilt, enqueue() während zehn Sekunden pro Sekunde einen Zeitstempel. Schließlich wird der Controller angewiesen, den Stream close(). Sie konsumieren das indem du mit der Methode getReader() einen Leser erstellst und read() aufrufst, bis der Stream done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Asynchrone Iteration

Die Prüfung bei jeder read()-Schleifeniteration, wenn der Stream done ist, ist möglicherweise nicht die bequemste API. Glücklicherweise gibt es dazu bald eine bessere Methode: die asynchrone Iteration.

for await (const chunk of stream) {
  console.log(chunk);
}

Eine Problemumgehung für die aktuelle Verwendung der asynchronen Iteration besteht darin, das Verhalten mit einem Polyfill zu implementieren.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Teeing eines lesbaren Streams

Die Methode tee() des Die ReadableStream-Schnittstelle verknüpft den aktuell lesbaren Stream und gibt ein Array aus zwei Elementen zurück. die die beiden resultierenden Zweige als neue ReadableStream-Instanzen enthält. Dadurch können Sie zwei Lesern, um einen Stream gleichzeitig zu lesen. Sie können dies beispielsweise in einem Service Worker tun, eine Antwort vom Server abrufen und an den Browser streamen, aber auch an den Service Worker-Cache. Da ein Antworttext nicht mehr als einmal verarbeitet werden kann, benötigen Sie zwei Kopien um dies zu tun. Zum Abbrechen des Streams müssen Sie dann beide resultierenden Zweige abbrechen. Stream starten wird sie in der Regel für die Dauer gesperrt, sodass andere Leser sie nicht sperren können.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Lesbare Bytestreams

Für Streams, die Byte darstellen, wird eine erweiterte Version des lesbaren Streams bereitgestellt, um Bytes effizient reduzieren, insbesondere durch das Minimieren von Kopien. Byte-Streams ermöglichen Zwischenspeichern (Bring-Your-Own-Zwischenspeicher) BYOB-Leser zu gewinnen. Die Standardimplementierung kann eine Reihe verschiedener Ausgaben liefern, z. B. im Fall von WebSockets als Zeichenfolgen oder Array-Zwischenspeicher, wohingegen Byte-Streams die Byte-Ausgabe garantieren. Darüber hinaus bieten BYOB-Lesegeräte Stabilitätsvorteile. Dies ist denn wenn ein Zwischenspeicher getrennt wird, kann er nicht zweimal in denselben Zwischenspeicher schreiben. wodurch Race-Bedingungen vermieden werden. BYOB-Lesegeräte können die Ausführungshäufigkeit des Browsers reduzieren automatische Speicherbereinigung, da Puffer wiederverwendet werden können.

Einen lesbaren Bytestream erstellen

Sie können einen lesbaren Bytestream erstellen, indem Sie einen zusätzlichen type-Parameter an den ReadableStream()-Konstruktor.

new ReadableStream({ type: 'bytes' });

underlyingSource

Die zugrunde liegende Quelle eines lesbaren Bytestreams wird mit einem ReadableByteStreamController zu manipulieren. Die ReadableByteStreamController.enqueue()-Methode verwendet ein chunk-Argument, dessen Wert ist ein ArrayBufferView. Das Attribut ReadableByteStreamController.byobRequest gibt den aktuellen Wert BYOB-Pull-Anfrage oder null, wenn keine vorhanden ist. Schließlich wird der ReadableByteStreamController.desiredSize gibt die gewünschte Größe zurück, um die interne Warteschlange des kontrollierten Streams zu füllen.

queuingStrategy

Das zweite, ebenfalls optionale Argument des ReadableStream()-Konstruktors ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert, wofür eine Parameter:

  • highWaterMark: Eine nicht negative Anzahl von Byte, die das Hochwasser des Streams mit dieser Warteschlangenstrategie angibt. Er wird verwendet, um den Rückstand zu bestimmen, der sich über das entsprechende ReadableByteStreamController.desiredSize-Attribut auswirkt. Sie bestimmt auch, wann die Methode pull() der zugrunde liegenden Quelle aufgerufen wird.

Die Methoden getReader() und read()

Sie können dann Zugriff auf ein ReadableStreamBYOBReader erhalten, indem Sie den mode-Parameter entsprechend festlegen: ReadableStream.getReader({ mode: "byob" }) Dies ermöglicht eine präzisere Kontrolle über den Zwischenspeicher. um Kopien zu vermeiden. Um aus dem Bytestream zu lesen, müssen Sie ReadableStreamBYOBReader.read(view), wobei view ein ArrayBufferView

Lesbares Bytestream-Codebeispiel

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Die folgende Funktion gibt lesbare Bytestreams zurück, die ein effizientes Lesen eines ein zufällig generiertes Array. Anstatt eine vorgegebene Chunk-Größe von 1.024 zu verwenden, des vom Entwickler bereitgestellten Zwischenspeichers und ermöglicht so volle Kontrolle.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mechanismen eines beschreibbaren Streams

Ein beschreibbarer Stream ist ein Ziel, in das Sie Daten schreiben können. Diese werden in JavaScript durch einen WritableStream-Objekt. Dieses dient als Abstraktion über einer zugrunde liegenden Senke – einer untergeordneten E/A-Senke, in die Rohdaten geschrieben werden.

Die Daten werden Block für Block über einen writer in den Stream geschrieben. Ein Chunk kann Formen, wie die Blöcke in einem Reader. Sie können jeden beliebigen Code verwenden, die Blöcke bereit zum Schreiben; Der Autor und der zugehörige Code werden als Producer bezeichnet.

Wenn ein Autor erstellt wird und anfängt, in einen Stream zu schreiben (ein aktiver Autor), gilt dies als nicht darauf zugreifen. Es kann jeweils nur ein Schreiber in einen beschreibbaren Stream schreiben. Wenn Sie eine weitere Writer, um in Ihren Stream zu schreiben, müssen Sie ihn in der Regel freigeben, bevor Sie ihn anhängen. mit einem anderen Autor.

Eine interne Warteschlange verfolgt die Blöcke, die in den Stream geschrieben wurden, aber noch nicht. von der zugrunde liegenden Senke verarbeitet.

Eine Warteschlangenstrategie ist ein Objekt, das festlegt, wie ein Stream den Gegendruck auf der Grundlage den Status seiner internen Warteschlange. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht Gesamtgröße aller Blöcke in der Warteschlange auf eine angegebene Zahl, die als Hochwassermarkierung bezeichnet wird.

Das endgültige Konstrukt wird als Controller bezeichnet. Jedem beschreibbaren Stream ist ein Controller zugeordnet, der ermöglicht dir das Steuern des Streams (z. B., um ihn abzubrechen).

Beschreibbaren Stream erstellen

Die WritableStream-Oberfläche von bietet die Streams API eine Standardabstraktion zum Schreiben von Streamingdaten in ein Ziel. als Senke. Dieses Objekt verfügt über einen integrierten Rückdruck und eine Wiedergabeliste. Sie erstellen einen beschreibbaren Stream, indem Sie durch Aufrufen seines Konstruktors WritableStream() Sie hat einen optionalen underlyingSink-Parameter, der ein Objekt darstellt. mit Methoden und Attributen, die das Verhalten der erstellten Streaminstanz definieren.

underlyingSink

underlyingSink kann die folgenden optionalen, vom Entwickler definierten Methoden enthalten. Das controller -Parameter, der an einige der Methoden übergeben wird, WritableStreamDefaultController

  • start(controller): Diese Methode wird sofort aufgerufen, wenn das Objekt erstellt wird. Die Der Inhalt dieser Methode sollte darauf abzielen, Zugriff auf die zugrunde liegende Senke zu erhalten. Wenn dieser Prozess asynchron ausgeführt wird, kann ein Promise zurückgegeben werden, das Erfolg oder Misserfolg signalisiert.
  • write(chunk, controller): Diese Methode wird aufgerufen, wenn ein neuer Datenblock (angegeben in den chunk) kann in die zugrunde liegende Senke geschrieben werden. Sie kann ein Versprechen zurückgeben, signalisiert den Erfolg oder Misserfolg des Schreibvorgangs. Diese Methode wird erst nach der vorherigen Schreibvorgänge waren erfolgreich und nie, nachdem der Stream geschlossen oder abgebrochen wurde.
  • close(controller): Diese Methode wird aufgerufen, wenn die App signalisiert, dass der Schreibvorgang abgeschlossen ist. an den Stream zu senden. Der Inhalt sollte alles tun, was notwendig ist, um Schreibvorgänge in den der zugrunde liegenden Senke und geben den Zugriff darauf frei. Wenn dieser Prozess asynchron ist, kann er einen Erfolg oder Misserfolg zu signalisieren. Diese Methode wird erst aufgerufen, nachdem alle Schreibvorgänge in der Warteschlange erfolgreich waren.
  • abort(reason): Diese Methode wird aufgerufen, wenn die App signalisiert, dass sie abrupt geschlossen werden soll. in einen fehlerhaften Zustand versetzt werden. Damit können alle zurückgehaltenen Ressourcen bereinigt werden, close(), aber abort() wird aufgerufen, auch wenn Schreibvorgänge in die Warteschlange gestellt werden. Diese Blöcke werden dann weg sind. Wenn dieser Prozess asynchron ist, kann er ein Versprechen zurückgeben, um Erfolg oder Misserfolg zu signalisieren. Die Der Parameter reason enthält einen DOMString-Wert, der beschreibt, warum der Stream abgebrochen wurde.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Die WritableStreamDefaultController Schnittstelle der Streams API stellt einen Controller dar, über den der Status einer WritableStream gesteuert werden kann während der Einrichtung, da weitere Blöcke zum Schreiben eingereicht werden, oder am Ende des Textes. Beim Erstellen eine WritableStream ist, erhält die zugrunde liegende Senke eine entsprechende WritableStreamDefaultController Instanz, die bearbeitet werden soll. WritableStreamDefaultController hat nur eine Methode: WritableStreamDefaultController.error(), Dadurch werden zukünftige Interaktionen mit dem zugehörigen Stream fehlerhaft. WritableStreamDefaultController unterstützt auch ein signal-Attribut, das eine Instanz von AbortSignal, sodass ein WritableStream-Vorgang bei Bedarf beendet werden kann.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Das zweite, ebenfalls optionale Argument des WritableStream()-Konstruktors ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert, die zwei Parameter:

  • highWaterMark: Eine nicht negative Zahl, die das Hochwasser des Flusses angibt, für das diese Warteschlangenstrategie verwendet wird.
  • size(chunk): Eine Funktion, die die endliche nicht negative Größe des gegebenen Chunk-Werts berechnet und zurückgibt. Das Ergebnis wird verwendet, um den Rückstand zu bestimmen, der sich über das entsprechende WritableStreamDefaultWriter.desiredSize-Attribut zeigt.

Die Methoden getWriter() und write()

Zum Schreiben in einen beschreibbaren Stream benötigen Sie einen Writer. Dieser ist ein WritableStreamDefaultWriter Die Methode getWriter() der WritableStream-Schnittstelle gibt ein neue Instanz von WritableStreamDefaultWriter hinzu und sperrt den Stream auf diese Instanz. Während die Stream gesperrt ist, kann kein anderer Writer übernommen werden, bis der aktuelle Stream veröffentlicht wird.

Die write() der Methode WritableStreamDefaultWriter schreibt einen übergebenen Datenblock in einen WritableStream und seine zugrunde liegende Senke und gibt dann Ein Promise, das aufgelöst wird, um den Erfolg oder Misserfolg des Schreibvorgangs anzuzeigen. Beachten Sie, dass „Erfolg“ liegt bei der zugrunde liegenden Senke; wird möglicherweise angezeigt, dass der Block akzeptiert wurde. und nicht unbedingt an ihrem endgültigen Ziel gespeichert sind.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Das Attribut locked

Sie können überprüfen, ob ein beschreibbarer Stream gesperrt ist, indem Sie auf seine WritableStream.locked Property.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Codebeispiel für beschreibbaren Stream

Das Codebeispiel unten zeigt alle Schritte in Aktion.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Einen lesbaren Stream in einen beschreibbaren Stream einbinden

Ein lesbarer Stream kann über die Pipeline pipeTo()-Methode. ReadableStream.pipeTo() leitet den aktuellen ReadableStream an eine gegebene WritableStream weiter und gibt einen Versprechen, das sich erfüllt, wenn der Pipe-Prozess erfolgreich abgeschlossen wird, oder es ablehnt, wenn Fehler gefunden wurden.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Transformationsstream erstellen

Die TransformStream-Schnittstelle der Streams API stellt eine Reihe transformierbarer Daten dar. Ich Erstellen Sie einen Transformationsstream, indem Sie seinen Konstruktor TransformStream() aufrufen. Dieser erstellt dann Ein Transformationsstreamobjekt aus den angegebenen Handlern. Der TransformStream()-Konstruktor akzeptiert als Das erste Argument ist ein optionales JavaScript-Objekt, das die transformer darstellt. Solche Objekte können eine der folgenden Methoden enthalten:

transformer

  • start(controller): Diese Methode wird sofort aufgerufen, wenn das Objekt erstellt wird. Normalerweise wird verwendet, um Präfixblöcke mit controller.enqueue() in die Warteschlange zu stellen. Diese Blöcke werden gelesen lesbar sein, sind jedoch nicht von Schreibvorgängen auf die beschreibbare Seite abhängig. Wenn diese Initiale ist asynchron, z. B. weil das Abrufen der Präfix-Blöcke Die Funktion kann ein Versprechen zurückgeben, um Erfolg oder Misserfolg zu signalisieren. wird ein abgelehntes Promise den Fehler . Alle ausgelösten Ausnahmen werden vom TransformStream()-Konstruktor noch einmal ausgelöst.
  • transform(chunk, controller): Diese Methode wird aufgerufen, wenn ein neuer Block geschrieben wurde, der ursprünglich in den beschreibbar ist, kann transformiert werden. Die Stream-Implementierung garantiert, dass diese Funktion wird erst aufgerufen, nachdem vorherige Transformationen erfolgreich waren, und nie vor start() abgeschlossen wurde oder nachdem flush() aufgerufen wurde. Diese Funktion führt die eigentliche Transformation des Transformationsstreams. Die Ergebnisse können mit controller.enqueue() in die Warteschlange eingereiht werden. Dieses ermöglicht es einem einzelnen Block, der auf die beschreibbare Seite geschrieben wird, zu null oder mehreren Blöcken auf der Seite lesbar ist, je nachdem, wie oft controller.enqueue() aufgerufen wird. Wenn das Verfahren erfolgt die Transformation asynchron, kann diese Funktion ein Versprechen zurückgeben, der Transformation. Ein abgelehntes Promise führt sowohl auf der lesbaren als auch auf der beschreibbaren Seite des Transform-Stream. Ist die Methode transform() nicht angegeben, wird die Identitätstransformation verwendet, die stellt Blöcke unverändert von der beschreibbaren Seite zur lesbaren Seite in die Warteschlange.
  • flush(controller): Diese Methode wird aufgerufen, nachdem alle auf die beschreibbaren Seite geschriebenen Blöcke abgeschlossen sind. durch erfolgreich durchlaufene transform() transformiert. Die beschreibbare Seite wird gleich geschlossen. In der Regel wird dies verwendet, um Suffix-Chunks auf die lesbare Seite zu stellen, bevor diese ebenfalls in die Warteschlange gestellt wird geschlossen wird. Wenn der Leerungsprozess asynchron ist, kann die Funktion ein Promise an Erfolg oder Misserfolg signalisieren; wird das Ergebnis an den Aufrufer stream.writable.write() Außerdem führt ein abgelehntes Promise beschreibbaren Seiten des Streams. Das Auslösen einer Ausnahme wird wie das Zurückgeben einer abgelehnten versprochen.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Die Warteschlangenstrategien writableStrategy und readableStrategy

Der zweite und dritte optionale Parameter des TransformStream()-Konstruktors sind optional Warteschlangenstrategien writableStrategy und readableStrategy. Sie sind definiert wie in der readable und writable -Abschnitte.

Codebeispiel für Transformationsstream

Das folgende Codebeispiel zeigt einen einfachen Transformationsstream in Aktion.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Einen lesbaren Stream durch einen Transformationsstream leiten

Die pipeThrough() -Methode der ReadableStream-Schnittstelle bietet eine verkettbare Methode für das Piping des aktuellen Streams. Transformationsstream oder ein anderes beschreibbar/lesbares Paar. Das Piepton eines Streams wird in der Regel gesperrt für die Dauer der Pipe, sodass andere Lesegeräte sie nicht sperren können.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Im nächsten, etwas konstruierten Codebeispiel, Version von fetch() der den gesamten Text großgeschrieben, indem das zurückgegebene Antwortversprechen verwendet wird. als Stream und schreiben Sie Block für Block in Großbuchstaben. Der Vorteil dieses Ansatzes ist, dass Sie nicht auf das gesamte Dokument herunterzuladen, was bei großen Dateien einen großen Unterschied machen kann.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

Die folgende Demo zeigt lesbare, beschreibbare und transformierbare Streams in Aktion. Sie enthält auch Beispiele pipeThrough() und pipeTo() und zeigt ebenfalls tee(). Sie können optional Demo in einem eigenen Fenster öffnen oder Quellcode.

Nützliche Streams im Browser

Zahlreiche nützliche Streams sind direkt in den Browser integriert. Sie können ganz einfach ein ReadableStream aus einem Blob. Die Blob stream() der Schnittstelle gibt Einen ReadableStream, der nach dem Lesen die im Blob enthaltenen Daten zurückgibt. Denken Sie auch daran, File-Objekt ist eine bestimmte Art von Blob und können in jedem Kontext verwendet werden, den ein Blob hat.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Die Streamingvarianten von TextDecoder.decode() und TextEncoder.encode() werden aufgerufen TextDecoderStream und TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Mit der Funktion CompressionStream und DecompressionStream-Transformationsstreams . Das folgende Codebeispiel zeigt, wie Sie die Streams-Spezifikation herunterladen und mit gzip komprimieren können. direkt im Browser und schreiben die komprimierte Datei direkt auf die Festplatte.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Die File System Access API FileSystemWritableFileStream Die experimentellen fetch()-Anfragestreams sind Beispiele für beschreibbare Ströme in freier Wildbahn.

Die Serial API nutzt sowohl gut lesbare als auch schreibbare Streams intensiv.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Schließlich integriert die WebSocketStream API Streams in die WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Nützliches Infomaterial

Danksagungen

Artikel wurde geprüft von Jake Archibald François Beaufort Sam Dutton Mattias Buelens Surma, Joe Medley und Adam Rice. Die Blogposts von Jake Archibald helfen mir sehr dabei, Streams. Einige Codebeispiele sind vom GitHub-Nutzer inspiriert. @bellbind explorative Datenanalysen und Teile des Textes auf dem MDN-Webdokumente auf Streams Die Streams Standard Autoren haben bei der Erstellung und diese Spezifikation schreiben. Hero-Image von Ryan Lara auf Entspritzen.