מקורות נתונים – המדריך המלא

במאמר הזה תלמדו איך להשתמש בשידורים קריאים, ניתנים לכתיבה ולבצע בהם טרנספורמציה באמצעות Streams API.

Streams API מאפשר לגשת באופן פרוגרמטי למקורות נתונים שהתקבלו ברשת או שנוצר על ידי אמצעים מקומיים לעבד אותם באמצעות JavaScript. סטרימינג כולל פירוט של המשאב שרוצים לקבל, לשלוח או לבצע בו טרנספורמציה למקטעי נתונים קטנים, ואז לעבד אותם בהדרגה. אמנם סטרימינג הוא משהו כאשר דפדפנים עושים זאת בכל מקרה, כשמקבלים נכסים כמו HTML או סרטונים להצגה בדפי אינטרנט, יכולת זו מעולם לא הייתה זמינה ל-JavaScript לפני fetch, באמצעות זרמים שהושקו ב-2015.

בעבר, אם רציתם לעבד משאב כלשהו (למשל: סרטון, קובץ טקסט וכו'), תצטרכו להוריד את הקובץ כולו, להמתין עד שתתבצע פעולת deserialize לפורמט מתאים, ואז לעבד אותו. במסגרת השידורים הזמינים JavaScript, כל זה משתנה. עכשיו אפשר לעבד נתונים גולמיים באמצעות JavaScript בהדרגה ברגע שהוא זמין אצל הלקוח, בלי שיהיה צורך ליצור מאגר נתונים זמני, מחרוזת או blob. הנה מספר תרחישים לדוגמה:

  • אפקטים לווידאו: יצירת צנרת של וידאו קריא דרך רצף טרנספורמציה שמחילה אפקטים בזמן אמת.
  • ביטול דחיסה של נתונים: חילוץ נתונים של קובץ בסטרימינג דרך זרם טרנספורמציה שבאופן סלקטיבי (de) מכווצת אותו.
  • פענוח קוד של תמונות: שרשור של תגובת HTTP דרך זרם טרנספורמציה שמפענח בייטים לנתוני מפת סיביות (bitmap) ואז דרך זרם טרנספורמציה אחר שמתרגם את מפות הביטים לקובצי PNG. אם המיקום מוטמע ב-handler של fetch של קובץ שירות (service worker), כך שניתן יהיה לבצע פולי מילוי שקוף פורמטים חדשים של תמונה, כמו AVIF.

תמיכה בדפדפנים

ReadableStream ו-WritableStream

תמיכה בדפדפן

  • Chrome: 43.
  • קצה: 14.
  • Firefox: 65.
  • Safari: 10.1.

מקור

TransformStream

תמיכה בדפדפן

  • Chrome: 67.
  • קצה: 79.
  • Firefox: 102.
  • Safari: 14.1.

מקור

מושגי ליבה

לפני שאעבור לפרטים על הסוגים השונים של שידורים, אני רוצה להציג כמה מושגים בסיסיים.

גושים

מקטע הוא פיסת נתונים יחידה שנכתבת אל מקור נתונים או קוראת ממנו. הוא יכול להיות בכל type; יכולים אפילו להכיל מקטעים מסוגים שונים. ברוב המקרים, מקטעים לא יהיו הכי אטומיים יחידת נתונים עבור מקור נתונים נתון. לדוגמה, שידור חי של בייטים עשוי להכיל מקטעים שמורכבים מ-16. יחידות Uint8ArrayKiB, במקום בייטים בודדים.

שידורים שאפשר לקרוא

זרם קריא מייצג מקור של נתונים שמהם ניתן לקרוא. במילים אחרות, נתונים מגיעים מזרם נתונים קריא. באופן קונקרטי, זרם קריא הוא מופע של ReadableStream בכיתה.

שידורים חיים

זרם ניתן לכתיבה מייצג יעד לנתונים שאפשר לכתוב בו. במילים אחרות, נתונים נכנס לשידור שניתן לכתיבה. באופן קונקרטי, זרם ניתן לכתיבה הוא דוגמה כיתה אחת (WritableStream).

טרנספורמציה של זרמים

זרם טרנספורמציה מורכב מצמד של זרמים: זרם ניתן לכתיבה, המכונה צד לכתיבה, וזרם קריא, המכונה הצד הקריא שלו. מטאפורה מהעולם האמיתי תרגום סימולטני שמתרגם משפה אחת לאחרת בזמן אמת. באופן ספציפי לזרם הטרנספורמציה, כתיבה לצד הניתן לכתיבה, כך שנתונים חדשים הופכים לזמינים לקריאה הצד הקריא. באופן קונקרטי, כל אובייקט עם מאפיין writable ומאפיין readable יכול לפעול בתור זרם של טרנספורמציה. עם זאת, הכיתה הרגילה TransformStream מאפשרת ליצור בקלות רבה יותר כזה שהסתבך בצורה נכונה.

שרשראות לצינורות

השימוש העיקרי בשידורים מתבצע על ידי העברה שלהם זה לזה. ניתן לנתב זרם קריא של נתונים באופן ישיר לזרם ניתן לכתיבה, באמצעות השיטה pipeTo() של השידור הקריא, או לצינור עיבוד נתונים או יותר לבצע טרנספורמציה של שידורים קודם, באמצעות השיטה pipeThrough() של הזרם הקריא. קבוצה של אם הם חוברו יחדיו בדרך הזו, הם נקראים 'שרשרת צינורות'.

לחץ גב

לאחר בניית שרשרת צינורות, היא תפיץ אותות לגבי מהירות הזרימה של המקטעים לקרוא אותו. אם שלב כלשהו בשרשרת עדיין לא יכול לקבל מקטעים, הוא מפיץ אות אחורה. דרך שרשרת הצינורות, עד שלבסוף נאמר למקור המקורי להפסיק לייצר מקטעים מהר. תהליך הנירמול של הזרימה נקרא לחץ לאחור.

חולצת טי

ניתן לערוך זרם קריא (לשם כך על שם הצורה של 'T') באותיות רישיות באמצעות שיטת ה-tee() שלו. הפעולה הזו תינעל את השידור, כלומר לא תהיה יותר אפשרות להשתמש בו באופן ישיר. אבל הוא ייצור שניים סטרימינג, שנקראים הסתעפויות, ואפשר להשתמש בו בנפרד. גם האפשרות של העברת שידורים חשובה כי לא ניתן להעביר או להפעיל מחדש שידורים. נרחיב על כך בהמשך.

תרשים של שרשרת צינורות שמורכבת מזרם קריא, שמגיע מקריאה ל-API לאחזור. לאחר מכן מועבר דרך זרם טרנספורמציה שהפלט שלו הוא te [ממחרוזת], ואז נשלח לדפדפן לזרם הקריא הראשון שנוצר ולמטמון של קובץ השירות (service worker) עבור הזרם הקריא השני שנוצר.
שרשרת צינורות.

המכניקה של שידור קריא

מקור נתונים קריא הוא מקור נתונים שמיוצג ב-JavaScript על ידי אובייקט ReadableStream ש עובר ממקור בסיסי. ReadableStream() constructor יוצר ומחזיר אובייקט סטרימינג קריא מה-handlers הנתונים. יש שתי פלטפורמות סוגים של מקור בסיסי:

  • מקורות נתונים בדחיפה דוחפים אתכם כל הזמן לנתונים ברגע שנכנסתם אליהם, והשליטה בידיים שלכם. להפעיל, להשהות או לבטל את הגישה לשידור. דוגמאות: וידאו בשידור חי, אירועים שנשלחו משרת, או WebSockets.
  • כשמשתמשים במקורות שליפת, צריך לבקש מהם נתונים באופן מפורש אחרי ההתחברות אליהם. דוגמאות לכלול פעולות HTTP דרך קריאות fetch() או XMLHttpRequest.

נתוני 'עדכונים' נקראים ברצף בחלקים קטנים שנקראים מקטעים. המקטעים שמוצבים בסטרימינג חייבים להיות נוספים לתור. כלומר, הם ממתינים בתור מוכנים לקריאה. תור פנימי עוקב אחר המקטעים שעדיין לא נקראו.

אסטרטגיית הוספה בתור היא אובייקט שקובעת איך זרם צריך לאותת לחץ לאחור על סמך במצב של התור הפנימי. אסטרטגיית ההוספה בתור מקצה גודל לכל מקטע ומשווה הגודל הכולל של כל המקטעים בתור למספר מסוים, שנקרא סימן מים גבוה.

קורא קורא את המקטעים שבשידור. הקורא מאחזר את הנתונים של מקטע אחד בכל פעם כך שתוכלו לבצע כל סוג של פעולה שתרצו לבצע. קורא המסך והשני עיבוד קוד שצורף יחד איתו, שנקרא צרכן.

המבנה הבא בהקשר הזה נקרא נאמן מידע. לכל זרם ניתן לקריאה משויך שלט רחוק, שכששמו מרמז, מאפשר לכם לשלוט בשידור.

רק קורא אחד יכול לקרוא שידור בכל פעם. כאשר קורא נוצר ומתחיל לקרוא זרם (כלומר, הופך לקורא פעיל), הוא נעול אליו. אם אתם רוצים שקורא אחר יטפל בכם בקריאת השידור, בדרך כלל צריך לשחרר את הקורא הראשון לפני שמבצעים פעולות אחרות (למרות שאפשר לבדוק שידורים).

יצירת עדכוני תוכן שניתן לקרוא

כדי ליצור שידור סטרימינג קריא, צריך לשלוח קריאה לבונה שלו ReadableStream() ל-constructor יש ארגומנט אופציונלי underlyingSource, שמייצג אובייקט באמצעות שיטות ומאפיינים שמגדירים את אופן הפעולה של המכונה של השידור המובנה.

underlyingSource

אפשר להשתמש ב-methods האופציונליות הבאות שמוגדרות על ידי המפתח:

  • start(controller): מתבצעת קריאה מיידית כשהאובייקט מורכב. יכולים לגשת למקור השידור ולעשות כל דבר אחר שנדרשות להגדרת פונקציונליות הסטרימינג. אם התהליך הזה מתבצע באופן אסינכרוני, השיטה יכולה להחזיר הבטחה לאותת הצלחה או כישלון. הפרמטר controller שמועבר ל-method הזה הוא A ReadableStreamDefaultController.
  • pull(controller): אפשר להשתמש בה כדי לשלוט בשידור בזמן אחזור של מקטעים נוספים. הוא נשלחת שוב ושוב כל עוד תור המקטעים הפנימי של השידור לא מלא, עד שתור מגיע לסימן המים הגבוה שלו. אם התוצאה של התקשרות אל pull() היא הבטחה, לא תתבצע שיחה חוזרת אל pull() עד שההבטחה הזו תמומש. אם ההבטחה תידחה, השידור יהפוך לשגיאה.
  • cancel(reason): קבלת שיחה כשצרכן השידור יבטל את השידור.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ב-ReadableStreamDefaultController יש תמיכה בשיטות הבאות:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

הארגומנט השני, שגם הוא אופציונלי, של ה-constructor של ReadableStream() הוא queuingStrategy. זהו אובייקט שמגדיר באופן אופציונלי אסטרטגיית הוספה לתור של השידור, שנדרשת :

  • highWaterMark: מספר לא שלילי שמציין את סימן המים הגבוה של הזרם באמצעות שיטת ההוספה הזו לתור.
  • size(chunk): פונקציה שמחשבת ומחזירה את הגודל הסופי הלא שלילי של ערך המקטע הנתון. התוצאה משמשת לקביעת לחץ לאחור, מתבטאת באמצעות מאפיין ReadableStreamDefaultController.desiredSize המתאים. הוא גם קובע מתי מתבצעת קריאה ל-method של pull() של המקור הבסיסי.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

השיטות getReader() ו-read()

כדי לקרוא מזרם 'זמין לקריאה', נדרש קורא, ReadableStreamDefaultReader השיטה getReader() בממשק ReadableStream יוצרת קורא ונועלת את השידור את זה. בזמן שהשידור נעול, לא ניתן לצרף קורא אחר עד שהקוראים האלה ישוחררו.

read() ה-method של הממשק ReadableStreamDefaultReader מחזירה הבטחה שמעניקה גישה מקטע בתור הפנימי של הזרם. הוא משלים או דוחה תוצאה בהתאם למצב בזרם. אלו האפשרויות:

  • אם מקטע זמין, ההבטחה תמומש באמצעות אובייקט בטופס
    { value: chunk, done: false }.
  • אם השידור ייסגר, ההבטחה תמומש באמצעות אובייקט שצורף לטופס
    { value: undefined, done: true }.
  • אם מתקבלת שגיאה בשידור, ההבטחה תידחה עם השגיאה הרלוונטית.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

הנכס locked

כדי לבדוק אם שידור ניתן לקריאה נעול, אפשר לגשת אליו ReadableStream.locked לנכס.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

דוגמאות של קוד שידור ניתן לקריאה

בדוגמת הקוד שבהמשך אפשר לראות את כל השלבים בפעולה. קודם כל צריך ליצור ReadableStream הארגומנט underlyingSource (כלומר, המחלקה TimestampSource) מגדיר שיטה start(). השיטה הזו מנחה את ה-controller של השידור ל- enqueue() חותמת זמן כל שנייה במהלך עשר שניות. לבסוף, היא מנחה את השלט רחוק close() את השידור. אתה צריך לשדר על ידי יצירת קורא באמצעות השיטה getReader() והפעלת read() עד שהשידור done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

איטרציה אסינכרונית

חשוב לבדוק כל חזרה בלופ של read() אם הסטרימינג הוא done – זה לא ה-API הנוח ביותר. למזלך, בקרוב תהיה דרך טובה יותר לעשות זאת: איטרציה אסינכרונית.

for await (const chunk of stream) {
  console.log(chunk);
}

פתרון עקיף לשימוש היום באיטרציה אסינכרונית הוא להטמיע את ההתנהגות באמצעות polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

יצירת טיזר של זרם קריא

ה-method tee() של הממשק ReadableStream מכניס את הזרם הקריא הנוכחי, ומחזיר מערך של שני רכיבים שמכיל את שתי ההסתעפויות שהתקבלו כמכונות ReadableStream חדשות. כך אפשר שני קוראים לקרוא עדכוני תוכן בו-זמנית. לדוגמה, תוכלו לעשות זאת ב-Service Worker אם כשרוצים לאחזר תגובה מהשרת ולשדר אותה לדפדפן, אך גם לשדר אותה בסטרימינג מטמון של קובץ שירות (service worker). לא ניתן לצרוך גוף תשובה יותר מפעם אחת, לכן צריך שני עותקים לשם כך. כדי לבטל את השידור, צריך לבטל את שני ההסתעפויות שנוצרו. רעשי רקע בדרך כלל הוא יינעל למשך פרק הזמן החדש, כך שלא תהיה אפשרות לקרוא אותו בקוראים אחרים.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

שידורי בייטים קריאים

בשידורים שמייצגים בייטים, מסופקת גרסה מורחבת של הזרם הקריא לשימוש. בייטים ביעילות, במיוחד על ידי צמצום העותקים. סטרימינג של בייטים מאפשרים ליצור מאגר נתונים זמני קוראים (BYOB) לרכישה. הטמעת ברירת המחדל יכולה לספק מגוון של פלטים שונים, בתור מחרוזות או מאגרי מערכים במקרה של WebSockets, וסטרימינג של בייטים מבטיחים פלט של בייטים. בנוסף, לקוראי BYOB יש יתרונות ליציבות. הדבר כי אם מאגר נתונים זמני מתנתק, הוא יכול להבטיח שמישהו לא יכתוב פעמיים באותו מאגר נתונים זמני, ולכן נמנעים ממרוץ התהליכים. קוראי BYOB יכולים לצמצם את מספר הפעמים שבהן הדפדפן צריך לפעול לאיסוף אשפה, כי ניתן לעשות בו שימוש חוזר במאגרי נתונים זמניים.

יצירת שידור חי של בייטים שניתן לקרוא

כדי ליצור שידור עם בייטים קריאים, צריך להעביר פרמטר type נוסף אל ReadableStream() constructor.

new ReadableStream({ type: 'bytes' });

underlyingSource

המקור הבסיסי של זרם בבייטים שניתן לקרוא מקבל ReadableByteStreamController ל- ולבצע מניפולציה. השיטה ReadableByteStreamController.enqueue() שלה מקבלת ארגומנט chunk שהערך שלו הוא ArrayBufferView. המאפיין ReadableByteStreamController.byobRequest מחזיר את הערך הנוכחי בקשת שליפת BYOB, או null אם אין. לבסוף, ReadableByteStreamController.desiredSize מחזיר את הגודל הרצוי כדי למלא את התור הפנימי של הזרם המבוקר.

queuingStrategy

הארגומנט השני, שגם הוא אופציונלי, של ה-constructor של ReadableStream() הוא queuingStrategy. זהו אובייקט שמגדיר באופן אופציונלי אסטרטגיית הוספה לתור של הזרם, שלוקחת :

  • highWaterMark: מספר לא שלילי של בייטים שמציין את סימן המים הגבוה של הזרם באמצעות שיטת הוספת הבייטים הזו לתור. הוא משמש לקביעת לחץ לאחור, שמתבטא באמצעות מאפיין ReadableByteStreamController.desiredSize המתאים. הוא גם קובע מתי מתבצעת קריאה ל-method של pull() של המקור הבסיסי.

השיטות getReader() ו-read()

לאחר מכן אפשר לקבל גישה אל ReadableStreamBYOBReader על ידי הגדרת הפרמטר mode בהתאם: ReadableStream.getReader({ mode: "byob" }). כך אפשר לקבל שליטה מדויקת יותר על מאגר הנתונים הזמני הקצאה כדי להימנע מיצירת עותקים. כדי לקרוא מהזרם של הבייטים, צריך לבצע קריאה ReadableStreamBYOBReader.read(view), כאשר view הוא ArrayBufferView.

דוגמת קוד שידור בייט שניתן לקריאה

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

הפונקציה הבאה מחזירה זרמים של בייטים קריאים שמאפשרים קריאה יעילה של עותק אפס מערך שנוצר באופן אקראי. במקום להשתמש בגודל מקטע שנקבע מראש (1,024), הוא מנסה למלא מאגר הנתונים הזמני שהמפתח מספק, ומאפשר שליטה מלאה.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

המכניקה של סטרימינג ניתן לכתיבה

זרם ניתן לכתיבה הוא יעד שבו ניתן לכתוב נתונים, המיוצגים ב-JavaScript באמצעות אובייקט WritableStream. הזה משמשת כהפשטה מעל כיור פנימי – כיור קלט/פלט ברמה נמוכה יותר שאליו נתונים גולמיים נכתבים.

הנתונים נכתבים אל מקור הנתונים באמצעות כתיבה, מקטע אחד בכל פעם. מקטע יכול לקחת צורות רבות, בדיוק כמו החתכים בקורא. אפשר להשתמש בכל קוד שרוצים להפיק את המקטעים המוכנים לכתיבה; הכותב והקוד המשויך נקראים מפיק.

כאשר כותב נוצר ומתחיל לכתוב לשידור (כותב פעיל), נאמר שהוא נעול אליו. רק מחבר אחד יכול לכתוב בסטרימינג שניתן לכתיבה בכל פעם. אם רוצים להוסיף כדי להתחיל לכתוב בסטרימינג, בדרך כלל צריך לפרסם אותו לפני שניתן לצרף עם כותב אחר.

תור פנימי עוקב אחר המקטעים שנכתבו לשידור אבל עדיין לא מעובד על ידי ה-sink הבסיסי.

אסטרטגיית הוספה בתור היא אובייקט שקובעת איך זרם צריך לאותת לחץ לאחור על סמך במצב של התור הפנימי. אסטרטגיית ההוספה בתור מקצה גודל לכל מקטע ומשווה הגודל הכולל של כל המקטעים בתור למספר מסוים, שנקרא סימן מים גבוה.

המבנה הסופי נקרא בקר. לכל שידור ניתן לכתיבה יש בקר מאפשרת לשלוט בשידור (לדוגמה, לבטל אותו).

יצירת שידור שניתן לכתיבה

הממשק WritableStream של ה-Stream API מספק הפשטה סטנדרטית של כתיבת נתונים בסטרימינג ליעד, הידוע בכיור. האובייקט הזה כולל לחץ לאחור והוספה לרשימת תור. יצירת שידור שניתן לכתיבה מתבצעת על ידי לקרוא ל-constructor שלו WritableStream() יש בו פרמטר underlyingSink אופציונלי, שמייצג אובייקט. באמצעות שיטות ומאפיינים שמגדירים את אופן הפעולה של המכונה של השידור המובנה.

underlyingSink

underlyingSink יכול לכלול את השיטות האופציונליות הבאות שמוגדרות על ידי המפתח. controller שמועבר לחלק מהשיטות הוא WritableStreamDefaultController.

  • start(controller): השיטה הזו מופעלת מיד לאחר בניית האובייקט. תכנים בשיטה הזו צריכים להיות גישה ל-sink הבסיסי. אם התהליך הזה נועד באופן אסינכרוני, הוא יכול להחזיר הבטחה לאותת הצלחה או כישלון.
  • write(chunk, controller): תתבצע קריאה לשיטה הזו כאשר תבוצע מקטע נתונים חדש (שצוין בקטע chunk) מוכן לכתיבה ל-sink הבסיסי. הוא יכול להחזיר הבטחה הצלחה או כשל בפעולת הכתיבה. תתבצע קריאה לשיטה הזו רק אחרי שהקריאה הקודמת הסתיים בהצלחה, ואף פעם לא לאחר הסגירה או ביטול של השידור.
  • close(controller): תתבצע קריאה לשיטה הזו אם האפליקציה מאשרת שהיא סיימה לכתוב של מקטעים בשידור. התוכן צריך לעשות את כל מה שנדרש כדי לסיים את הכתיבה ב-sink שבבסיס שלהן, ומשחררים את הגישה אליו. אם התהליך הזה אסינכרוני, הוא יכול להחזיר מבטיחים להראות הצלחה או כישלון. תתבצע קריאה לשיטה הזו רק לאחר סיום הכתיבה של כל הצליחו.
  • abort(reason): תתבצע קריאה לשיטה הזו אם האפליקציה מזהה שהיא רוצה להיסגר בפתאומיות את הסטרימינג, ולהעביר אותו למצב שגיאה. הוא יכול לנקות כל משאב שמור, כמו close(), אבל תתבצע שיחה אל abort() גם אם הודעות הכתיבה נמצאות בתור. המקטעים האלה יישללו לא נמצא. אם התהליך הזה אסינכרוני, הוא יכול להחזיר הבטחה לאותת הצלחה או כישלון. הפרמטר reason מכיל DOMString שמתאר את הסיבה לביטול הסטרימינג.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

WritableStreamDefaultController הממשק של Streams API מייצג בקר שמאפשר שליטה במצב של WritableStream במהלך ההגדרה, כי עוד מקטעים נשלחים לכתיבה או בסוף הכתיבה. בזמן הבנייה WritableStream, ל-sink הבסיס מוקצה WritableStreamDefaultController תואם כדי לבצע מניפולציה. ל-WritableStreamDefaultController יש רק שיטה אחת: WritableStreamDefaultController.error(), שגורם לשגיאות באינטראקציות עתידיות עם הזרם המשויך. הפונקציה WritableStreamDefaultController תומכת גם במאפיין signal שמחזיר מופע של AbortSignal, כך שאפשר יהיה להפסיק פעולה של WritableStream במקרה הצורך.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

הארגומנט השני, שגם הוא אופציונלי, של ה-constructor של WritableStream() הוא queuingStrategy. זהו אובייקט שמגדיר באופן אופציונלי אסטרטגיית הוספה לתור של השידור, שנדרשת :

  • highWaterMark: מספר לא שלילי שמציין את סימן המים הגבוה של הזרם באמצעות שיטת ההוספה הזו לתור.
  • size(chunk): פונקציה שמחשבת ומחזירה את הגודל הסופי הלא שלילי של ערך המקטע הנתון. התוצאה משמשת לקביעת לחץ לאחור, מתבטאת באמצעות מאפיין WritableStreamDefaultWriter.desiredSize המתאים.

השיטות getWriter() ו-write()

כדי לכתוב בזרם ניתן לכתיבה, אתה צריך 'כותב', שיהיה WritableStreamDefaultWriter ה-method getWriter() בממשק WritableStream מחזירה המופע החדש של WritableStreamDefaultWriter וננעל את השידור למכונה הזו. אומנם השידור נעול, לא ניתן להשיג כותבים אחרים עד שהשידור הנוכחי ישוחרר.

write() ה-method של WritableStreamDefaultWriter הממשק כותב מקטע נתונים שמועבר אל WritableStream ול-sink הבסיסי שלו, ואז מחזיר הבטחה שנוצרה באופן שמעיד על הצלחה או כישלון של פעולת הכתיבה. שימו לב ש "הצלחה" שפירושו הוא עד הכיור הבסיסי; יכול להיות שיציין שהמקטע התקבל, ולא בהכרח לשמור אותו באופן בטוח ביעד הסופי שלו.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

הנכס locked

כדי לבדוק אם שידור שניתן לכתיבה נעול, אפשר לגשת WritableStream.locked לנכס.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

דוגמת קוד שידור ניתן לכתיבה

בדוגמת הקוד שמוצגת בהמשך אפשר לראות את כל השלבים בפעולה.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

העברת זרם ניתן לקריאה אל זרם שניתן לכתיבה

ניתן לנתב זרם קריא לזרם שקריא דרך pipeTo(). הפונקציה ReadableStream.pipeTo() מעבירה את ה-ReadableStream הנוכחי ל-WritableStream נתון ומחזירה מבטיח שתהליך ההעברה יושלם בהצלחה, או שהוא ידחה אם יש שגיאות המערכת נתקלה בבעיה.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

יצירת זרם של טרנספורמציה

ממשק TransformStream של Streams API מייצג קבוצה של נתונים שניתנים לטרנספורמציה. שלך ליצור זרם טרנספורמציה על ידי קריאה ל-constructor שלו TransformStream(), שיוצר ומחזיר אובייקט זרם של טרנספורמציה מה-handlers הנתונים. ה-constructor של TransformStream() מקבל בתור הארגומנט הראשון שלו, אובייקט JavaScript אופציונלי שמייצג את transformer. אובייקטים כאלה יכולים מכילים אחת מהשיטות הבאות:

transformer

  • start(controller): השיטה הזו מופעלת מיד לאחר בניית האובייקט. בדרך כלל משמש להוספת מקטעי קידומת לתור, באמצעות controller.enqueue(). המערכת תקרא את המקטעים האלה מהצד הקריא, אך לא תלויים בכתיבה בצד של הכתיבה. אם האות שמייצגת את שם המשתמש הוא אסינכרוני, לדוגמה כי נדרש מאמץ מסוים כדי להשיג את מקטעי הקידומת, הפונקציה יכולה להחזיר הבטחה לאותת הצלחה או כישלון. אם הבטחה שנדחתה, . כל חריג שנפסל ייפסל מחדש על ידי ה-constructor של TransformStream().
  • transform(chunk, controller): השיטה הזו מופעלת כאשר מקטע חדש נכתב במקור הצד הניתן לכתיבה מוכן לשינוי. הטמעת הזרם מבטיחה שהפונקציה תיקרא רק אחרי שהשינויים הקודמים יסתיימו בהצלחה, ואף פעם לא לפני start() שהושלמו או אחרי הקריאה של flush(). הפונקציה הזו מבצעת את הטרנספורמציה בפועל בעולם הטרנספורמציה. הוא יוכל להוסיף את התוצאות לתור באמצעות controller.enqueue(). הזה מאפשר מקטע יחיד שנכתב בצד הניתן לכתיבה כדי ליצור אפס או מספר מקטעים הצד הקריא, בהתאם למספר הפעמים שנשלחת ל-controller.enqueue() קריאה. אם התהליך של הטרנספורמציה היא אסינכרונית, הפונקציה הזו יכולה להחזיר הבטחה לאותת הצלחה או כשל של הטרנספורמציה. הבטחה שנדחתה תגרום לשגיאה גם בצד הקריא וגם בניסוח של המודעה ונבצע טרנספורמציה של זרם נתונים. אם לא תספקו שיטת transform(), המערכת תשתמש בטרנספורמציה של הזהות, מעביר מקטעים בתור ללא שינוי מהצד הניתן לכתיבה לצד הקריא.
  • flush(controller): לשיטה הזו קוראים אחרי שכל המקטעים שנכתבו לצד ניתן לכתיבה על ידי מעבר בהצלחה דרך transform(), והצד הניתן לכתיבה עומד להיות נסגר. בדרך כלל משתמשים בשיטה הזו כדי להוסיף מקטעי סיומת לתור לצד הקריא, עוד לפני כן נסגר. אם תהליך הניקוי הוא אסינכרוני, הפונקציה יכולה להחזיר הבטחה לסמן הצלחה או כישלון. התוצאה תישלח למתקשר stream.writable.write() בנוסף, הבטחה שנדחתה תגרום לשגיאה גם בקריאה וגם שניתן לכתוב בסטרימינג. ביטול חריגה נחשב כהחזרה של הודעה שנדחתה מבטיחים.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

שיטות ההוספה לתור writableStrategy ו-readableStrategy

הפרמטר האופציונלי השני והשלישי של ה-constructor של TransformStream() הם אופציונליים שיטות בידינג writableStrategy ו-readableStrategy בתור. הם מוגדרים כפי שמתואר קריא והזרם לכתיבה בקטעים השונים בהתאמה.

שינוי דוגמת הקוד של השידור

דוגמת הקוד הבאה מציגה זרם טרנספורמציה פשוט בפעולה.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

פיתוח של זרם קריא דרך זרם טרנספורמציה

pipeThrough() ה-method של הממשק ReadableStream מספקת דרך שרשרת-שרשרטים של השידור הנוכחי באמצעות זרם טרנספורמציה או כל צמד אחר של כתיבה/קריאה. העברת צינור בסטרימינג בדרך כלל ננעלת לכל אורך הצינור, כדי למנוע מקוראים אחרים לנעול אותו.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

דוגמת הקוד הבאה (מעט שגויה) ממחישה איך אפשר ליישם 'צעקה' גרסה של fetch() שממירה את כל הטקסט לאותיות רישיות, על ידי שימוש בהבטחת התשובה המוחזרת כסטרימינג ומגדילים את החלק העליון של מקטעים. היתרון של גישה זו הוא שלא צריך להמתין את המסמך כולו להורדה, מה שיכול לעשות הבדל גדול מאוד כשעובדים עם קבצים גדולים.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

הדגמה (דמו)

בהדגמה הבאה אפשר לראות קטעים קריאים, ניתנים לכתיבה וטרנספורמציה בפעולה. כולל גם דוגמאות של שרשראות צינורות pipeThrough() ו-pipeTo(), וגם tee(). אפשר להריץ את ההדגמה בחלון משלה, או צופים קוד מקור.

שידורים חיים זמינים בדפדפן

הדפדפן כולל מספר שידורים שימושיים שמובְנים בו. אפשר ליצור בקלות ReadableStream מ-blob. Blob המתודה stream() בממשק מחזירה ReadableStream, שכשהקריאה מחזירה את הנתונים שנכללים ב-blob. כמו כן, צריך לזכור אובייקט File הוא סוג ספציפי של Blob, ואפשר להשתמש בו בכל הקשר שבו blob יכול.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

הווריאנטים של סטרימינג של TextDecoder.decode() ושל TextEncoder.encode() נקראות TextDecoderStream וגם TextEncoderStream בהתאמה.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

לדחוס קובץ או לבטל דחיסה שלו בקלות באמצעות CompressionStream ו- DecompressionStream טרנספורמציה של זרמים בהתאמה. דוגמת הקוד הבאה מראה איך אפשר להוריד את המפרט של 'עדכונים', לדחוס (gzip) את המפרט של 'עדכונים' ישירות בדפדפן, ולכתוב את הקובץ הדחוס ישירות לדיסק.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

של File System Access API FileSystemWritableFileStream וזרמי הבקשות הניסיוניים fetch() הם דוגמאות לזרמים שניתן לכתיבה בטבע.

ה-Series API עושה שימוש נרחב גם בשידורים קריאים וגם בשידורים ניתנים לכתיבה.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

לבסוף, ה-API של WebSocketStream משלב שידורים עם WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

משאבים שימושיים

אישורים

המאמר הזה נבדק על ידי ג'ייק ארצ'יבלד, פרנסואה ביופורט, סם דוטון, מתיאס בולנס, Surma, ג'ו מדלי, וגם אדם רייס. הפוסטים של ג'ייק ארצ'יבלד עזרו לי מאוד להבין בסטרימינג. חלק מדוגמאות הקוד הן בהשראת משתמש GitHub הניתוחים של @bellbind חלקים בפרוזה מסתמכים בעיקר על MDN Web Docs ב-Streams של Streams Standard מחברים עשו עבודה אדירה באמצעות המפרט הזה. התמונה הראשית (Hero) של ריאן לארה מופעלת ביטול הפתיחה.