מקורות נתונים – המדריך המלא

איך משתמשים ב-Streams API כדי לקרוא, לכתוב ולבצע טרנספורמציה של מקורות נתונים (streams)

Streams API מאפשר לכם לגשת באופן פרוגרמטי למקורות נתונים שהתקבלו ברשת או נוצרו בכל אמצעי מקומי, ולעבד אותם באמצעות JavaScript. סטרימינג כולל פירוק של משאב שרוצים לקבל, לשלוח או לשנות למקטעים קטנים, ולאחר מכן עיבוד של המקטעים האלה ביט אחרי ביט. דפדפנים מבצעים סטרימינג בכל מקרה כשהם מקבלים נכסים כמו HTML או סרטונים שמוצגים בדפי אינטרנט, אבל היכולת הזו לא הייתה זמינה ל-JavaScript לפני שהתכונה fetch עם סטרימינג הושקה ב-2015.

בעבר, אם רצית לעבד משאב כלשהו (סרטון, קובץ טקסט וכו'), היה עליך להוריד את הקובץ כולו, להמתין עד שהוא יתבצע דה-סריאליזציה לפורמט מתאים ואז לעבד אותו. כשהסטרים זמינים ל-JavaScript, כל זה משתנה. מעכשיו אפשר לעבד נתונים גולמיים באמצעות JavaScript באופן הדרגתי ברגע שהם זמינים בצד הלקוח, בלי צורך ליצור מאגר נתונים זמני, מחרוזת או blob. כך אפשר להשתמש במספר תרחישים לדוגמה, חלק מהם מפורטים בהמשך:

  • אפקטים לווידאו: קידוד של שידור וידאו קריא דרך טרנספורמציה של אפקטים שמופעלים בזמן אמת.
  • (הסרת) דחיסת נתונים: העברת מקור נתונים דרך מקור נתונים מותאם שמבצע (הסרת) דחיסה באופן סלקטיבי.
  • פענוח קוד של תמונות: העתקה של תגובת HTTP בסטרימינג דרך טרנספורמציה שמפענחת בייטים לנתוני מפת סיביות, ואז דרך שידור טרנספורמציה אחר שמתרגם את מפות הביטים לקובצי PNG. אם מתקינים את הקוד בתוך הטיפולן fetch של שירות ה-worker, אפשר להשתמש בו כדי לבצע פוליגונום שקוף של פורמטים חדשים של תמונות, כמו AVIF.

תמיכה בדפדפנים

ReadableStream ו-WritableStream

תמיכה בדפדפנים

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

מקור

TransformStream

תמיכה בדפדפנים

  • Chrome:‏ 67.
  • קצה: 79.
  • Firefox: 102.
  • Safari:‏ 14.1.

מקור

מושגי ליבה

לפני שאעבור לפרטים על הסוגים השונים של שידורים, אני רוצה להציג כמה מושגים בסיסיים.

גושים

מקטע הוא פיסת נתונים יחידה שנכתבת אל מקור נתונים או קוראת ממנו. הוא יכול להיות מכל סוג, והזרמים יכולים לכלול גם קטעים מסוגים שונים. ברוב המקרים, מקטע לא יהיה יחידת הנתונים האטומית ביותר של מקור נתונים נתון. לדוגמה, זרם נתונים של בייטים עשוי להכיל מקטעים שמורכבים מ-16KiB יחידות Uint8Array במקום בייטים בודדים.

מקורות נתונים שניתנים לקריאה

מקור נתונים שאפשר לקרוא מייצג מקור נתונים שאפשר לקרוא ממנו. במילים אחרות, הנתונים יוצאים מזרם שניתן לקריאה. באופן ספציפי, מקור נתונים לקריאה הוא מופע של המחלקה ReadableStream.

שידורים חיים

זרם ניתן לכתיבה מייצג יעד לנתונים שאפשר לכתוב בו. במילים אחרות, הנתונים נכנסים למקור נתונים שאפשר לכתוב בו. באופן קונקרטי, שידור ניתן לכתיבה הוא מכונה של המחלקה WritableStream.

טרנספורמציה של מקורות נתונים

מקור נתונים לטרנספורמציה מורכב מזוג מקורות נתונים: מקור נתונים לכתיבה, שנקרא הצד לכתיבה, ומקור נתונים לקריאה, שנקרא הצד לקריאה. מטאפורה לכך בעולם האמיתי היא מתרגם סימולטני שמתרגם משפה אחת לשפה אחרת בזמן אמת. באופן ספציפי לזרם הטרנספורמציה, כתיבה בצד הניתן לכתיבה יוצרת נתונים חדשים לקריאה מהצד הקריא. באופן ספציפי, כל אובייקט עם מאפיין writable ומאפיין readable יכול לשמש בתור זרם טרנספורמציה. עם זאת, השימוש ב-class הסטנדרטי TransformStream מקל על יצירת זוג כזה שמקושר בצורה נכונה.

שרשראות לצינורות

השימוש העיקרי בזרמים הוא העברה (piping) שלהם זה לזה. אפשר להעביר זרם לקריאה ישירות לזרם לכתיבה באמצעות השיטה pipeTo() של הזרם לקריאה, או להעביר אותו דרך זרם טרנספורמציה אחד או יותר באמצעות השיטה pipeThrough() של הזרם לקריאה. קבוצה של מקורות נתונים שמחוברים יחד בצינור נקראת שרשרת צינורות.

לחץ חזרה

אחרי בניית שרשרת הצינורות, היא תפיץ אותות לגבי המהירות שבה מקטעים צריכים לעבור דרכה. אם שלב כלשהו בשרשרת עדיין לא יכול לקבל קטעי קוד, הוא מעביר אות לאחור דרך שרשרת הצינור, עד שבסופו של דבר המקור המקורי מקבל הודעה להפסיק לייצר קטעי קוד במהירות כזו. תהליך הנירמול של הזרימה נקרא לחץ לאחור.

הנחיתה על המגרש

אפשר להשתמש בשיטה tee() כדי ליצור יציאה מרכזית (נקראת כך בגלל הצורה שלה, 'T' גדול) לשידור שניתן לקריאה. הפעולה הזו תנעל את הסטרימינג, כלומר לא תהיה אפשרות להשתמש בו באופן ישיר. עם זאת, היא תיצור שני סטרימינגים חדשים, שנקראים ענפים, שאפשר לצרוך אותם בנפרד. חשוב גם להתחיל את השידור בזמן הנכון כי אי אפשר להריץ אותו לאחור או להפעיל אותו מחדש. נרחיב על כך בהמשך.

תרשים של רצף צינור
שרשרת צינורות.

המנגנון של מקור נתונים קריא

מקור נתונים שאפשר לקרוא אותו הוא מקור נתונים שמיוצג ב-JavaScript באמצעות אובייקט ReadableStream שמגיע ממקור בסיסי. ה-constructor ReadableStream() יוצר ומחזיר אובייקט סטרימינג קריא מה-handlers הנתונים. יש שני סוגים של מקורות בסיסיים:

  • מקורות דחיפה שולחים נתונים כל הזמן אחרי שמקבלים אליהם גישה, ועליך להתחיל, להשהות או לבטל את הגישה לשידור. דוגמאות לכך הן שידורי וידאו חיים, אירועים שנשלחים מהשרת או WebSockets.
  • במקורות משיכה צריך לבקש מהם נתונים באופן מפורש אחרי שמתחברים אליהם. דוגמאות לכך הן פעולות HTTP באמצעות קריאות fetch() או XMLHttpRequest.

נתוני מקור הנתונים נקרא ברצף בחלקים קטנים שנקראים קטעים. המקטעים שמוצבים בסטרימינג חייבים להיות נוספים לתור. כלומר, הם ממתינים בתור ומוכן לקריאה. תור פנימי עוקב אחר המקטעים שעדיין לא נקראו.

שיטת תורים היא אובייקט שמגדיר איך מקור נתונים צריך לסמן לחץ חוזר על סמך המצב של התור הפנימי שלו. אסטרטגיית ההמתנה בתור מקצה גודל לכל מקטע, ומשווים את הגודל הכולל של כל המקטעים בתור למספר מסוים שנקרא נקודת הפסגה.

קורא קורא את הקטעים בתוך הסטרימינג. הקוראים האלה מאחזרים את הנתונים בחלקים, ומאפשרים לבצע כל פעולה שרוצים עליהם. הקורא יחד עם קוד העיבוד האחר שמצורף אליו נקרא צרכן.

המבנה הבא בהקשר הזה נקרא נאמן מידע. לכל מקור נתונים שאפשר לקרוא לו יש רכיב בקרה שמשויך אליו, וכפי ששמו מרמז, הוא מאפשר לשלוט במקור הנתונים.

רק קורא אחד יכול לקרוא את מקור הנתונים בכל רגע נתון. כשקורא נוצר ומתחיל לקרוא את מקור הנתונים (כלומר הופך לקורא פעיל), הוא מונע ממנו. אם רוצים שקורא אחר ימשיך לקרוא את הסטרימינג, בדרך כלל צריך לשחרר את הקורא הראשון לפני שמבצעים פעולה אחרת (אבל אפשר גם לחלק את הסטרימינג).

יצירת מקור נתונים לקריאה

כדי ליצור מקור נתונים שאפשר לקרוא אותו, צריך להפעיל את ה-constructor שלו, ReadableStream(). למבנה ה-constructor יש ארגומנט אופציונלי underlyingSource, שמייצג אובייקט עם שיטות ומאפיינים שמגדירים את האופן שבו מופע הסטרימינג שנוצר יתנהג.

underlyingSource

אפשר לעשות זאת באמצעות השיטות האופציונליות הבאות שמוגדרות על ידי המפתח:

  • start(controller): הקריאה מתבצעת מיד כשהאובייקט נוצר. השיטה יכולה לגשת למקור הסטרימינג ולבצע כל פעולה אחרת שנדרשת להגדרת הפונקציונליות של הסטרימינג. אם התהליך הזה צריך להתבצע באופן אסינכרוני, השיטה יכולה להחזיר הבטחה (promise) כדי לסמן הצלחה או כשל. הפרמטר controller שמועבר ל-method הוא ReadableStreamDefaultController.
  • pull(controller): אפשר להשתמש בו כדי לשלוט בשידור בזמן אחזור קטעים נוספים. היא מופעלת שוב ושוב כל עוד תור המקטעים הפנימי של השידור לא מלא, עד שהתור מגיע לסימן המים הגבוה שלו. אם התקשרות אל pull() היא הבטחה, לא תתבצע שיחה חוזרת אל pull() עד שההבטחה הזו תמומש. אם הבטחה תידחה, ההעברה תהיה עם שגיאה.
  • cancel(reason): הפונקציה הזו נקראת כשצרכן הסטרימינג מבטל את הסטרימינג.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController תומך בשיטות הבאות:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

הארגומנט השני, שגם הוא אופציונלי, של ה-constructor של ReadableStream() הוא queuingStrategy. זהו אובייקט שמגדיר אופציונלית אסטרטגיית תורים לסטרימינג, עם שני פרמטרים:

  • highWaterMark: מספר לא שלילי שמייצג את נקודת השיא של הסטרימינג באמצעות שיטת התור הזו.
  • size(chunk): פונקציה שמחשבת ומחזירה את הגודל המוגבל והחיובי של ערך הקטע הנתון. התוצאה משמשת לקביעת לחץ החזרה, שמופיע דרך המאפיין המתאים ReadableStreamDefaultController.desiredSize. הוא קובע גם מתי מתבצעת הקריאה לשיטה pull() של המקור הבסיסי.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

השיטות getReader() ו-read()

כדי לקרוא מזרם שאפשר לקרוא, צריך קורא, שהוא ReadableStreamDefaultReader. השיטה getReader() של הממשק ReadableStream יוצרת קורא ונועלת את הסטרימינג אליו. כשהשידור נעול, לא ניתן לקבל קורא אחר עד שהקורא הזה ישוחרר.

השיטה read() בממשק ReadableStreamDefaultReader מחזירה הבטחה שמעניקה גישה למקטע הבא בתור הפנימי של השידור. היא ממלאת או דוחה תוצאה בהתאם למצב הסטרימינג. האפשרויות השונות הן:

  • אם יש מקטע זמין, ההתחייבות תתמלא באובייקט מהצורה
    { value: chunk, done: false }.
  • אם השידור ייסגר, ההבטחה תמומש באמצעות אובייקט שצורף לטופס
    { value: undefined, done: true }.
  • אם תהיה שגיאה בסטרימינג, ההבטחה תידחה עם השגיאה הרלוונטית.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

הנכס locked

כדי לבדוק אם שידור ניתן לקריאה נעול, נכנסים למאפיין ReadableStream.locked שלו.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

דוגמאות של קוד שידור ניתן לקריאה

בדוגמת הקוד שבהמשך אפשר לראות את כל השלבים בפעולה. קודם יוצרים ReadableStream שמגדיר את השיטה start() בארגומנט underlyingSource שלו (כלומר, בכיתה TimestampSource). השיטה הזו מנחה את controller של השידור ל-enqueue() חותמת זמן כל שנייה במהלך עשר שניות. לבסוף, הוא מצווה על הבקר close() את הסטרימינג. כדי לצרוך את המקור הזה, יוצרים קורא באמצעות ה-method‏ getReader() ומפעילים את read() עד שהמקור הוא done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

איטרציה אסינכרונית

בדיקה בכל מחזור של הלולאה read() אם הסטרימינג הוא done עשויה להיות לא ממשק ה-API הנוח ביותר. למרבה המזל, בקרוב תהיה דרך טובה יותר לעשות זאת: איטרציה אסינכרונית.

for await (const chunk of stream) {
  console.log(chunk);
}

פתרון עקיף לשימוש בחזרה אסינכררונית היום הוא הטמעת ההתנהגות באמצעות polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

יצירת טיזר של זרם קריא

השיטה tee() של הממשק ReadableStream מחלקת את הסטרימינג הנוכחי לקריאה, ומחזירה מערך של שני רכיבים שמכיל את שני ההסתעפויות שהתקבלו כמכונות ReadableStream חדשות. כך שני קוראים יכולים לקרוא את אותו מקור בו-זמנית. אפשר לעשות זאת, למשל, בקובץ שירות (service worker) אם רוצים לאחזר תשובה מהשרת ולהעביר אותה בסטרימינג לדפדפן, אבל גם להעביר אותה בסטרימינג למטמון של קובץ השירות. מכיוון שאי אפשר לצרוך גוף תגובה יותר מפעם אחת, צריך שתי עותקים כדי לעשות זאת. כדי לבטל את המקור, צריך לבטל את שני ההסתעפויות שנוצרו. הוספת תוכן של סטרימינג ינעלה אותו בדרך כלל למשך פרק הזמן הזה, כך שקוראים אחרים לא יוכלו לנעול אותו.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

מקורות נתונים של בייטים שניתנים לקריאה

לגבי מקורות נתונים שמייצגים בייטים, קיימת גרסה מורחבת של מקור הנתונים שאפשר לקרוא, כדי לטפל בבייטים ביעילות, במיוחד על ידי צמצום מספר העותקים. באמצעות שידורי בייטים אפשר לקבל קוראים מסוג bring-your-own-buffer‏ (BYOB). הטמעת ברירת המחדל יכולה לספק מגוון של פלטים שונים, כמו מחרוזות או מאגרי נתונים זמניים של מערכים במקרה של WebSockets, ואילו סטרימינג של בייטים מבטיחים פלט של בייטים. בנוסף, לקוראים של BYOB יש יתרונות יציבות. הסיבה לכך היא שאם מאגר מנותק, אפשר להבטיח שלא כותבים לאותו מאגר פעמיים, וכך להימנע ממצבים של תחרות. קוראי BYOB יכולים לצמצם את מספר הפעמים שהדפדפן צריך להריץ את האיסוף של האשפה, כי הוא יכול לעשות שימוש חוזר במאגרים.

יצירת שידור חי של בייטים שניתן לקרוא

כדי ליצור שידור עם בייטים קריאים, מעבירים פרמטר type נוסף ל-constructor של ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

המקור הבסיסי של מקור בייט לקריאה מקבל ReadableByteStreamController כדי לבצע בו מניפולציה. ה-method ReadableByteStreamController.enqueue() שלה לוקחת את ארגומנט chunk שהערך שלו הוא ArrayBufferView. המאפיין ReadableByteStreamController.byobRequest מחזיר את בקשת ה-pull הנוכחית של BYOB, או null אם אין כזו. לבסוף, הנכס ReadableByteStreamController.desiredSize מחזיר את הגודל הרצוי כדי למלא את התור הפנימי של הסטרימינג המבוקר.

queuingStrategy

הארגומנט השני של ה-constructor של ReadableStream(), שגם הוא אופציונלי, הוא queuingStrategy. זהו אובייקט שמגדיר אופציונלית אסטרטגיית תורים לסטרימינג, עם פרמטר אחד:

  • highWaterMark: מספר בייטים לא שלילי שמציין את נקודת השיא של הסטרימינג באמצעות שיטת התור הזו. המאפיין הזה משמש לקביעת לחץ החזרה, שמופיע דרך המאפיין המתאים ReadableByteStreamController.desiredSize. הוא גם קובע מתי מתבצעת קריאה ל-method של pull() של המקור הבסיסי.

השיטות getReader() ו-read()

לאחר מכן תוכלו לקבל גישה ל-ReadableStreamBYOBReader על ידי הגדרת הפרמטר mode בהתאם: ReadableStream.getReader({ mode: "byob" }). כך אפשר לשלוט בצורה מדויקת יותר בהקצאת המאגר כדי למנוע העתקות. כדי לקרוא מזרם הבייטים, צריך לבצע קריאה ל-ReadableStreamBYOBReader.read(view), כאשר view הוא ArrayBufferView.

דוגמת קוד של מחרוזת בייט לקריאה

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

הפונקציה הבאה מחזירה מקורות נתונים של בייטים לקריאה, שמאפשרים קריאה יעילה ללא העתקה של מערך שנוצר באופן אקראי. במקום להשתמש בגודל מקטע מוגדר מראש של 1,024, הוא מנסה למלא את מאגר הנתונים הזמני שסופק על ידי המפתח, ומאפשר שליטה מלאה.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

המכניקה של סטרימינג ניתן לכתיבה

מקור נתונים לכתיבה הוא יעד שבו אפשר לכתוב נתונים, שמיוצגים ב-JavaScript באובייקט WritableStream. הוא משמש כהפשטה מעל כיור בסיסי – sink ברמה נמוכה יותר של קלט/פלט (I/O) שבו נכתבים נתונים גולמיים.

הנתונים נכתבים אל מקור הנתונים באמצעות כתיבה, מקטע אחד בכל פעם. מקטע יכול להופיע במגוון צורות, בדיוק כמו המקטעים בקורא. אתם יכולים להשתמש בכל קוד שתרצו כדי ליצור את הקטעים שיהיו מוכנים לכתיבה. הקוד וגם הכלי לכתיבה נקראים מפיק.

כשיוצרים גורם כתיבה ומתחילים לכתוב בזרם (גורם כתיבה פעיל), הוא מונע ממנו. רק גורם אחד יכול לכתוב בזרם שאפשר לכתוב בו בכל פעם. אם רוצים שגורם אחר יתחיל לכתוב בסטרימינג, בדרך כלל צריך לשחרר אותו ואז לצרף אליו גורם כתיבה אחר.

תור פנימי עוקב אחרי קטעי הנתונים שנכתבו בסטרימינג אבל עדיין לא עברו עיבוד על ידי היעד הבסיסי.

שיטת תורים היא אובייקט שמגדיר איך מקור נתונים צריך לסמן לחץ חוזר על סמך המצב של התור הפנימי שלו. אסטרטגיית ההמתנה בתור מקצה גודל לכל מקטע, ומשווים את הגודל הכולל של כל המקטעים בתור למספר מסוים שנקרא נקודת הפסגה.

המבנה הסופי נקרא בקר. לכל מקור נתונים לכתיבה יש אמצעי בקרה משויך שמאפשר לשלוט במקור הנתונים (לדוגמה, לבטל אותו).

יצירת זרם לכתיבה

הממשק WritableStream של Streams API מספק הפשטה סטנדרטית לכתיבת נתונים בסטרימינג ליעד, שנקרא sink. האובייקט הזה כולל לחץ חוזר (backpressure) ויצירת תורים מובנים. כדי ליצור מקור נתונים לכתיבה, צריך להפעיל את ה-constructor שלו, WritableStream(). יש לו פרמטר underlyingSink אופציונלי, שמייצג אובייקט עם שיטות ומאפיינים שמגדירים את האופן שבו מופע המקור יתנהג.

underlyingSink

underlyingSink יכול לכלול את השיטות האופציונליות הבאות שמוגדרות על ידי המפתח. הפרמטר controller שמועבר לחלק מהשיטות הוא WritableStreamDefaultController.

  • start(controller): השיטה הזו מופעלת מיד לאחר בניית האובייקט. התוכן בשיטה הזו צריך להיות גישה ל-sink הבסיסי. אם התהליך הזה צריך להתבצע באופן אסינכרוני, אפשר להחזיר הבטחה כדי לסמן הצלחה או כישלון.
  • write(chunk, controller): השיטה הזו תופעל כשמקטע נתונים חדש (שצוין בפרמטר chunk) יהיה מוכן להיכתב לבור הנתונים הבסיסי. הוא יכול להחזיר הבטחה כדי לסמן הצלחה או כישלון של פעולת הכתיבה. תתבצע קריאה לשיטה הזו רק אחרי שפעולות כתיבה קודמות יסתיימו בהצלחה, ולעולם לא אחרי שהשידור ייסגר או יבוטל.
  • close(controller): תתבצע קריאה לשיטה הזו אם האפליקציה מזהה שהיא סיימה לכתוב מקטעי נתונים בסטרימינג. התוכן צריך לבצע את כל הפעולות הנדרשות כדי לסיים את הכתיבה לבור הנתונים הבסיסי ולשחרר את הגישה אליו. אם התהליך הזה הוא אסינכרוני, הוא יכול להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון. המערכת תקרא לשיטה הזו רק אחרי שכל פעולות הכתיבה שנמצאות בתור יסתיימו בהצלחה.
  • abort(reason): תתבצע קריאה לשיטה הזו אם האפליקציה מזהה שהיא מעוניינת לסגור את השידור בפתאומיות ולהעביר אותו למצב שגיאה. הוא יכול לנקות משאבים שנשמרו, בדומה ל-close(), אבל abort() ייכלל גם אם יש שורות ברשימה של פעולות כתיבה. הקטעים האלה יושמדו. אם התהליך הזה הוא אסינכרוני, הוא יכול להחזיר הבטחה כדי לסמן הצלחה או כישלון. הפרמטר reason מכיל את הערך DOMString שמתאר את הסיבה לביטול ההעברה.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

הממשק WritableStreamDefaultController של Streams API מייצג בקר שמאפשר לשלוט במצב של WritableStream במהלך ההגדרה, כשמעבירים עוד קטעים לכתיבה או בסוף הכתיבה. כשבונים WritableStream, ה-sink הבסיסי מקבל מכונת WritableStreamDefaultController תואמת שאפשר לשנות. ל-WritableStreamDefaultController יש רק method אחד: WritableStreamDefaultController.error(), שגורם לשגיאות בכל אינטראקציה עתידית עם הסטרימינג המשויך. WritableStreamDefaultController תומך גם במאפיין signal שמחזיר מופע של AbortSignal, שמאפשר להפסיק פעולת WritableStream במקרה הצורך.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

הארגומנט השני של ה-constructor של WritableStream(), שגם הוא אופציונלי, הוא queuingStrategy. זהו אובייקט שמגדיר אופציונלית אסטרטגיית תורים לסטרימינג, עם שני פרמטרים:

  • highWaterMark: מספר לא שלילי שמייצג את נקודת השיא של הסטרימינג באמצעות שיטת התור הזו.
  • size(chunk): פונקציה שמחשבת ומחזירה את הגודל המוגבל והחיובי של ערך הקטע הנתון. התוצאה משמשת לקביעת לחץ לאחור, מתבטאת באמצעות מאפיין WritableStreamDefaultWriter.desiredSize המתאים.

השיטות getWriter() ו-write()

כדי לכתוב בסטרימינג שאפשר לכתוב בו, צריך גורם כתיבה, שהוא WritableStreamDefaultWriter. ה-method getWriter() בממשק WritableStream מחזירה מופע חדש של WritableStreamDefaultWriter ונועל את השידור למכונה הזו. בזמן שהסטרימינג נעול, אי אפשר לצרף סופר אחר עד שמשחררים את הסופר הנוכחי.

השיטה write() של הממשק WritableStreamDefaultWriter כותבת מקטע נתונים שהועברו ל-WritableStream ולצינור הניקוז הבסיסי שלו, ואז מחזירה הבטחה שמתבררת כדי לציין את ההצלחה או הכישלון של פעולת הכתיבה. חשוב לזכור שהמשמעות של 'הצלחה' תלויה ב-sink הבסיסי. ייתכן שהמשמעות היא שהקטע אושר, ולא בהכרח שהוא נשמר בבטחה ביעד הסופי שלו.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

הנכס locked

כדי לבדוק אם מקור נתונים לכתיבה נעול, אפשר לגשת לנכס WritableStream.locked שלו.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

דוגמת קוד של מקור נתונים לכתיבה

בדוגמת הקוד שמוצגת בהמשך אפשר לראות את כל השלבים בפעולה.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

העברה של מקור נתונים לקריאה למקור נתונים לכתיבה

אפשר להעביר באמצעות צינור (pipe) מקור נתונים לקריאה למקור נתונים לכתיבה באמצעות השיטה pipeTo() של מקור הנתונים לקריאה. ReadableStream.pipeTo() מעביר את הערך של ReadableStream הנוכחי ל-WritableStream נתון ומחזיר הבטחה שמתממשת כשתהליך הצינור עיבוד נתונים מסתיים בהצלחה, או דוחה אם אירעו שגיאות.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

יצירת מקור נתונים לטראנספורמציה

הממשק TransformStream של Streams API מייצג קבוצה של נתונים שניתן לבצע בהם טרנספורמציה. כדי ליצור transform stream, צריך לקרוא למבנה ה-constructor שלו, TransformStream(), שיוצר אובייקט transform stream מהמפעילים שצוינו ומחזיר אותו. ה-constructor של TransformStream() מקבל כארגומנט הראשון אובייקט JavaScript אופציונלי שמייצג את transformer. אובייקטים כאלה יכולים להכיל כל אחת מהשיטות הבאות:

transformer

  • start(controller): ה-method הזו נקראת מיד כשהאובייקט נוצר. בדרך כלל משתמשים באפשרות הזו כדי להוסיף מקטעי קידומת לתור, באמצעות controller.enqueue(). הקטעים האלה יקראו מהצד הקריא, אבל הם לא תלויים בכתיבת בצד הכתיבה. אם התהליך הראשוני הוא אסינכרוני, למשל כי צריך להשקיע מאמץ מסוים כדי להשיג את מקטעי התחילית, הפונקציה יכולה להחזיר הבטחה לאותת הצלחה או כישלון. הבטחה שנדחתה תגרום בשגיאה בזרם. כל חריגות שייזרקו יושלחו מחדש על ידי ה-constructor של TransformStream().
  • transform(chunk, controller): ה-method הזה נקרא כשמקטע חדש שנכתב במקור בצד הכתיבה מוכן לטרנספורמציה. הטמעת הסטרימינג מבטיחה שהפעלת הפונקציה הזו תתבצע רק אחרי שהטרנספורמציות הקודמות הסתיימו, אף פעם לא לפני שהטרנספורמציה start() הסתיימה או אחרי שהפעלת flush(). הפונקציה מבצעת את עבודת הטרנספורמציה בפועל של הזרם הטרנספורמציה. הוא יכול להוסיף את התוצאות לתור באמצעות controller.enqueue(). כך אפשר לכתוב מקטע יחיד שנכתב בצד הניתן לכתיבה, וכך ליצור אפס או כמה מקטעים בצד הקריא, בהתאם למספר הקריאה של controller.enqueue(). אם תהליך הטרנספורמציה הוא אסינכרוני, הפונקציה הזו יכולה להחזיר הבטחה לאותת הצלחה או כישלון בטרנספורמציה. אם הבטחה תידחה, תופיע שגיאה גם בצד הקריאה וגם בצד הכתיבה של מקור הנתונים לעיבוד. אם לא סופקה method transform(), ייעשה שימוש בטרנספורמציה של הזהות, שמעבירים מקטעים ללא שינוי מהצד הניתן לכתיבה לצד הקריא.
  • flush(controller): ה-method הזה נקרא אחרי שכל הקטעים שנכתבו בצד הכתיבה עברו טרנספורמציה בהצלחה דרך transform(), והצד הכתיבה עומד להיסגר. בדרך כלל משתמשים באפשרות הזו כדי להוסיף לתור קטעי סיומת בצד הקריאה, לפני שהוא נסגר גם כן. אם תהליך הניקוי הוא אסינכרוני, הפונקציה יכולה להחזיר הבטחה להצלחה או לכשל של האות. התוצאה תועבר לקורא של stream.writable.write(). בנוסף, הבטחה שנדחתה תגרום לשגיאה גם בצד הקריא וגם בצד של השידור. השלכת חריגה נחשבת לאותה פעולה כמו החזרת הבטחה שנדחתה.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

שיטות ההוספה לתור writableStrategy ו-readableStrategy

הפרמטרים האופציונליים השני והשלישי של ה-constructor של TransformStream() הם אסטרטגיות אופציונליות של writableStrategy ו-readableStrategy לתור. הם מוגדרים כפי שמתואר בקטעים של זרם הקריאה (readable) ושל זרם הכתיבה (writable) בהתאמה.

דוגמה לקוד של טרנספורמציה של מקור נתונים

דוגמת הקוד הבאה מראה איך פועלת תצוגת מקור פשוטה של טרנספורמציה.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

העברה של מקור נתונים לקריאה דרך מקור נתונים לטרנספורמציה

השיטה pipeThrough() של ממשק ReadableStream מספקת דרך לשרשור של צינור להעברת הנתונים (pipe) מהזרם הנוכחי דרך זרם טרנספורמציה או כל צמד אחר שאפשר לכתוב/לקרוא בו. בדרך כלל, העברת נתונים ב-pipe תנעל אותו למשך כל תקופת ההעברה, ותימנע מקוראים אחרים לנעול אותו.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

בדוגמת הקוד הבאה (קצת מלאכותית) מוסבר איך אפשר להטמיע גרסה 'רועשת' של fetch() שממירה את כל הטקסט לאותיות רישיות על ידי שימוש בהבטחה של התגובה שהוחזרה כזרם והמרת כל מקטע לאותיות רישיות. היתרון של הגישה הזו הוא שאין צורך להמתין להורדה של המסמך כולו, וזה יכול לעשות הבדל עצום כשעובדים עם קבצים גדולים.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

הדגמה (דמו)

בהדגמה הבאה מוצגות פעולות של קריאה, כתיבה וטרנספורמציה של מקורות נתונים. הוא כולל גם דוגמאות למחרוזות צינורות של pipeThrough() ו-pipeTo(), וגם הדגמה של tee(). אפשר להריץ את הדמו בחלון נפרד או להציג את קוד המקור.

מקורות מידע מועילים שזמינים בדפדפן

יש מספר מקורות מידע שימושיים שמובנים ישירות בדפדפן. אפשר ליצור בקלות ReadableStream מ-blob. שיטת stream()‎ של הממשק Blob מחזירה ReadableStream, שמחזיר את הנתונים שמכיל ה-blob בזמן הקריאה. חשוב לזכור גם שאובייקט File הוא סוג ספציפי של Blob, ואפשר להשתמש בו בכל הקשר שבו אפשר להשתמש ב-blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

הווריאנטים של TextDecoder.decode() ו-TextEncoder.encode() לשידור בזמן אמת נקראים TextDecoderStream ו-TextEncoderStream, בהתאמה.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

קל לדחוס או לחלץ קובץ באמצעות המרות של CompressionStream ו-DecompressionStream, בהתאמה. דוגמת הקוד הבאה מראה איך אפשר להוריד את המפרט של Streams, לדחוס (gzip) אותו ישירות מהדפדפן ולכתוב את הקובץ הדחוס ישירות לדיסק.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream של File System Access API וfetch() request streams הניסיוניים הם דוגמאות לזרמים שאפשר לכתוב בהם בעולם האמיתי.

ה-Series API עושה שימוש נרחב גם בשידורים קריאים וגם בשידורים ניתנים לכתיבה.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

לבסוף, ה-API של WebSocketStream משלב בין סטרימינג ל-WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

משאבים שימושיים

תודות

הבדיקה של המאמר בוצעה על ידי Jake Archibald,‏ François Beaufort,‏ Sam Dutton,‏ Mattias Buelens,‏ Surma,‏ Joe Medley ו-Adam Rice. הפוסטים של ג'ייק ארצ'יבלד עזרו לי מאוד להבין את השידור. חלק מדגימות הקוד מבוססות על החקירות של משתמש GitHub‏ ‎@bellbind, וחלק מהטקסט מבוסס במידה רבה על מסמכי התיעוד של MDN בנושא Streams. המחברים של Streams Standard עשו עבודה נהדרת בכתיבת המפרט הזה. התמונה הראשית היא של Ryan Lara ב-Unsplash.