איך משתמשים ב-Streams API כדי לקרוא, לכתוב ולבצע טרנספורמציה של מקורות נתונים (streams)
Streams API מאפשר לכם לגשת באופן פרוגרמטי למקורות נתונים שהתקבלו ברשת או נוצרו בכל אמצעי מקומי, ולעבד אותם באמצעות JavaScript. סטרימינג כולל פירוק של משאב שרוצים לקבל, לשלוח או לשנות למקטעים קטנים, ולאחר מכן עיבוד של המקטעים האלה ביט אחרי ביט. דפדפנים מבצעים סטרימינג בכל מקרה כשהם מקבלים נכסים כמו HTML או סרטונים שמוצגים בדפי אינטרנט, אבל היכולת הזו לא הייתה זמינה ל-JavaScript לפני שהתכונה fetch
עם סטרימינג הושקה ב-2015.
בעבר, אם רצית לעבד משאב כלשהו (סרטון, קובץ טקסט וכו'), היה עליך להוריד את הקובץ כולו, להמתין עד שהוא יתבצע סריאליזציה לפורמט מתאים ואז לעבד אותו. כשהסטרים זמינים ל-JavaScript, כל זה משתנה. מעכשיו אפשר לעבד נתונים גולמיים באמצעות JavaScript באופן הדרגתי ברגע שהם זמינים בצד הלקוח, בלי צורך ליצור מאגר נתונים זמני, מחרוזת או blob. כך אפשר להשתמש במספר תרחישים לדוגמה, חלק מהם מפורטים בהמשך:
- אפקטים של וידאו: העברה של שידור וידאו קריא דרך שידור טרנספורמציה שמחיל אפקטים בזמן אמת.
- (הסרת) דחיסת נתונים: העברת מקור נתונים דרך מקור נתונים מותאם שמבצע (הסרת) דחיסה באופן סלקטיבי.
- פענוח תמונות: העברה של מקור נתונים של תגובת HTTP דרך מקור נתונים של טרנספורמציה שמפענח בייטים לנתוני ביומטריק, ולאחר מכן דרך מקור נתונים נוסף של טרנספורמציה שמתרגם ביומטריק ל-PNG. אם מתקינים את הקוד בתוך הטיפולן
fetch
של שירות ה-worker, אפשר להשתמש בו כדי להוסיף polyfill לשקיפות לפורמטים חדשים של תמונות, כמו AVIF.
תמיכה בדפדפנים
ReadableStream ו-WritableStream
TransformStream
מושגי ליבה
לפני שנכנס לפרטים על הסוגים השונים של שידורים, אסביר על כמה מושגים בסיסיים.
גושים
מקטע הוא חלק יחיד של נתונים שנכתב בזרם או נקרא ממנו. הוא יכול להיות מכל סוג, והזרמים יכולים לכלול גם קטעים מסוגים שונים. ברוב המקרים, מקטע לא יהיה יחידת הנתונים האטומית ביותר של מקור נתונים נתון. לדוגמה, מקור נתונים של בייטים עשוי להכיל קטעים שמכילים 16 יחידות Uint8Array
בגודל KiB, במקום בייטים בודדים.
מקורות נתונים שניתנים לקריאה
מקור נתונים שאפשר לקרוא מייצג מקור נתונים שאפשר לקרוא ממנו. במילים אחרות, הנתונים יוצאים מזרם שניתן לקריאה. באופן ספציפי, מקור נתונים לקריאה הוא מופע של המחלקה ReadableStream
.
מקורות נתונים שניתנים לכתיבה
מקור נתונים לכתיבה מייצג יעד לנתונים שאפשר לכתוב בהם. במילים אחרות, הנתונים נכנסים למקור נתונים שאפשר לכתוב בו. באופן ספציפי, מקור נתונים לכתיבה הוא מופע של המחלקה WritableStream
.
טרנספורמציה של מקורות נתונים
מקור נתונים לטרנספורמציה מורכב מזוג מקורות נתונים: מקור נתונים לכתיבה, שנקרא הצד לכתיבה, ומקור נתונים לקריאה, שנקרא הצד לקריאה.
מטאפורה לכך בעולם האמיתי היא מתרגם סימולטני שמתרגם משפה אחת לשפה אחרת בזמן אמת.
באופן ספציפי למקור הנתונים של הטרנספורמציה, כתיבת נתונים בצד הכתיבה מאפשרת לקרוא נתונים חדשים בצד הקריאה. באופן ספציפי, כל אובייקט עם מאפיין writable
ומאפיין readable
יכול לשמש בתור זרם טרנספורמציה. עם זאת, בעזרת הכיתה הרגילה TransformStream
קל יותר ליצור זוג כזה שמקושר בצורה נכונה.
שרשראות לצינורות
השימוש העיקרי בזרמים הוא העברה (piping) שלהם זה לזה. אפשר להעביר זרם לקריאה ישירות לזרם לכתיבה באמצעות השיטה pipeTo()
של הזרם לקריאה, או להעביר אותו דרך זרם טרנספורמציה אחד או יותר באמצעות השיטה pipeThrough()
של הזרם לקריאה. קבוצה של מקורות נתונים שמחוברים יחד בצינור נקראת שרשרת צינורות.
לחץ חזרה
אחרי שיוצרים שרשרת צינורות, היא מפיצה אותות לגבי המהירות שבה קטעי הקוד צריכים לעבור בה. אם שלב כלשהו בשרשרת עדיין לא יכול לקבל קטעי קוד, הוא מעביר אות לאחור דרך שרשרת הצינור, עד שבסופו של דבר המקור המקורי מקבל הודעה להפסיק לייצר קטעי קוד במהירות כזו. התהליך הזה של נורמליזציה של התנועה נקרא לחץ חזרה.
הנחיתה על המגרש
אפשר להשתמש בשיטה tee()
כדי ליצור יציאה מרכזית (נקראת כך בגלל הצורה שלה, 'T' גדול) לשידור שניתן לקריאה.
הפעולה הזו תנעיל את הסטרימינג, כלומר לא תהיה אפשרות להשתמש בו באופן ישיר. עם זאת, היא תיצור שני סטרימינגים חדשים, שנקראים ענפים, שאפשר לצרוך אותם בנפרד.
חשוב גם להתחיל את השידור בזמן הנכון כי אי אפשר להריץ אותו לאחור או להפעיל אותו מחדש. נרחיב על כך בהמשך.
המנגנון של מקור נתונים קריא
מקור נתונים שאפשר לקרוא אותו הוא מקור נתונים שמיוצג ב-JavaScript באמצעות אובייקט ReadableStream
שמגיע ממקור בסיסי. ה-constructor של ReadableStream()
יוצר אובייקט של מקור נתונים לקריאה מהמפעילים שצוינו, ומחזיר אותו. יש שני סוגים של מקורות בסיסיים:
- מקורות דחיפה שולחים נתונים כל הזמן אחרי שמקבלים אליהם גישה, ועליך להתחיל, להשהות או לבטל את הגישה לשידור. דוגמאות לכך הן שידורי וידאו חיים, אירועים שנשלחים מהשרת או WebSockets.
- במקורות משיכה צריך לבקש מהם נתונים באופן מפורש אחרי שמתחברים אליהם. דוגמאות לכך הן פעולות HTTP באמצעות קריאות
fetch()
אוXMLHttpRequest
.
נתוני מקור הנתונים נקרא ברצף בחלקים קטנים שנקראים קטעים. הקטעים שמתווספים לזרם נקראים קטעים בתור. כלומר, הם ממתינים בתור ומוכן לקריאה. תור פנימי עוקב אחרי הקטעים שעדיין לא נקראו.
שיטת תורים היא אובייקט שמגדיר איך מקור נתונים צריך לסמן לחץ חוזר על סמך המצב של התור הפנימי שלו. אסטרטגיית ההמתנה בתור מקצה גודל לכל מקטע, ומשווים את הגודל הכולל של כל המקטעים בתור למספר מסוים שנקרא נקודת הפסגה.
קורא קורא את הקטעים בתוך הסטרימינג. הקוראים האלה מאחזרים את הנתונים ברצף, כך שתוכלו לבצע כל פעולה שתרצו עליהם. הקורא יחד עם קוד העיבוד האחר שמצורף אליו נקרא צרכן.
המבנה הבא בהקשר הזה נקרא בקר. לכל מקור נתונים שאפשר לקרוא לו יש רכיב בקרה שמשויך אליו, וכפי ששמו מרמז, הוא מאפשר לשלוט במקור הנתונים.
רק קורא אחד יכול לקרוא את מקור הנתונים בכל רגע נתון. כשקורא נוצר ומתחיל לקרוא את מקור הנתונים (כלומר הופך לקורא פעיל), הוא מונע ממנו. אם רוצים שקורא אחר ימשיך לקרוא את הסטרימינג, בדרך כלל צריך לשחרר את הקורא הראשון לפני שמבצעים פעולה אחרת (אבל אפשר גם לחלק את הסטרימינג).
יצירת מקור נתונים לקריאה
כדי ליצור מקור נתונים שאפשר לקרוא אותו, צריך להפעיל את ה-constructor שלו, ReadableStream()
.
למבנה ה-constructor יש ארגומנט אופציונלי underlyingSource
, שמייצג אובייקט עם שיטות ומאפיינים שמגדירים את האופן שבו מופע הסטרימינג שנוצר יתנהג.
underlyingSource
אפשר להשתמש בשיטות האופציונליות הבאות שהוגדרו על ידי המפתחים:
start(controller)
: הקריאה מתבצעת מיד כשהאובייקט נוצר. השיטה יכולה לגשת למקור הסטרימינג ולבצע כל פעולה אחרת שנדרשת להגדרת הפונקציונליות של הסטרימינג. אם התהליך הזה צריך להתבצע באופן אסינכרוני, השיטה יכולה להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון. הפרמטרcontroller
שמוענק לשיטה הזו הואReadableStreamDefaultController
.pull(controller)
: אפשר להשתמש בו כדי לשלוט בשידור בזמן אחזור קטעים נוספים. הוא נקרא שוב ושוב כל עוד הקטעים שבתור הפנימי של הסטרימינג לא מלאים, עד שהתור מגיע לנקודת השיא שלו. אם התוצאה של קריאה ל-pull()
היא הבטחה, לא תתבצע קריאה חוזרת ל-pull()
עד שההבטחה תתמלא. אם הבטחה תידחה, ההעברה תהיה עם שגיאה.cancel(reason)
: הפונקציה הזו נקראת כשצרכן הסטרימינג מבטל את הסטרימינג.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
תומך בשיטות הבאות:
ReadableStreamDefaultController.close()
סוגר את המקור המשויך.ReadableStreamDefaultController.enqueue()
הוספת מקטע נתון לתור בסטרימינג המשויך.ReadableStreamDefaultController.error()
גורם לשגיאות בכל אינטראקציה עתידית עם הסטרימינג המשויך.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
הארגומנט השני של ה-constructor של ReadableStream()
, שגם הוא אופציונלי, הוא queuingStrategy
.
זהו אובייקט שמגדיר אופציונלית אסטרטגיית תורים לסטרימינג, עם שני פרמטרים:
highWaterMark
: מספר לא שלילי שמציין את נקודת השיא של הסטרימינג באמצעות שיטת התור הזו.size(chunk)
: פונקציה שמחשבת ומחזירה את הגודל המוגבל והחיובי של ערך הקטע הנתון. התוצאה משמשת לקביעת לחץ החזרה, שמופיע דרך המאפיין המתאיםReadableStreamDefaultController.desiredSize
. הוא קובע גם מתי מתבצעת הקריאה לשיטהpull()
של המקור הבסיסי.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
השיטות getReader()
ו-read()
כדי לקרוא מזרם שאפשר לקרוא, צריך קורא, שהוא ReadableStreamDefaultReader
.
השיטה getReader()
של הממשק ReadableStream
יוצרת קורא ונועלת את הסטרימינג אליו. כשהשידור נעול, לא ניתן לקבל קורא אחר עד שהקורא הזה ישוחרר.
השיטה read()
של הממשק ReadableStreamDefaultReader
מחזירה הבטחה (promise) שמספקת גישה לקטע הבא בתור הפנימי של הסטרימינג. הוא ממלא או דוחה את הבקשה עם תוצאה, בהתאם למצב של המקור. האפשרויות השונות הן:
- אם מקטע זמין, ההתחייבות תתמלא באובייקט מהצורה
{ value: chunk, done: false }
. - אם הסטרימינג יסתיים, ההתחייבות תתמלא באובייקט מהצורה
{ value: undefined, done: true }
. - אם תהיה שגיאה בסטרימינג, ההבטחה תידחה עם השגיאה הרלוונטית.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
הנכס locked
כדי לבדוק אם מקור נתונים שאפשר לקרוא אותו נעול, צריך לגשת לנכס ReadableStream.locked
שלו.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
דוגמאות לקוד של שידורים שאפשר לקרוא
בדוגמת הקוד שבהמשך מוצגים כל השלבים בפעולה. קודם יוצרים ReadableStream
שמגדיר את השיטה start()
בארגומנט underlyingSource
שלו (כלומר, בכיתה TimestampSource
).
השיטה הזו מורה ל-controller
של השידור להוסיף חותמת זמן בכל שנייה במשך עשר שניות.enqueue()
לבסוף, הוא מצווה על הבקר close()
את הסטרימינג. כדי לצרוך את המקור הזה, יוצרים קורא באמצעות ה-method getReader()
ומפעילים את read()
עד שהמקור הוא done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
איטרציה אסינכרונית
בדיקה בכל מחזור של הלולאה read()
אם הסטרימינג הוא done
עשויה להיות לא ממשק ה-API הנוח ביותר.
למרבה המזל, בקרוב תהיה דרך טובה יותר לעשות זאת: איטרציה אסינכרונית.
for await (const chunk of stream) {
console.log(chunk);
}
פתרון עקיף לשימוש בחזרה אסינכררונית היום הוא הטמעת ההתנהגות באמצעות polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
יצירת מקור נתונים לקריאה
השיטה tee()
של הממשק ReadableStream
מחלקת את המקור הנוכחי לקריאה לשני ענפים, ומחזירה מערך של שני רכיבים שמכיל את שני הענפים שהתקבלו כמכונות ReadableStream
חדשות. כך שני קוראים יכולים לקרוא את אותו מקור בו-זמנית. אפשר לעשות זאת, למשל, בקובץ שירות (service worker) אם רוצים לאחזר תשובה מהשרת ולהעביר אותה בסטרימינג לדפדפן, אבל גם להעביר אותה בסטרימינג למטמון של קובץ השירות. מכיוון שאי אפשר לצרוך גוף תגובה יותר מפעם אחת, צריך שתי עותקים כדי לעשות זאת. כדי לבטל את המקור, צריך לבטל את שני ההסתעפויות שנוצרו. בדרך כלל, הנעילה של מקור נתונים תהיה בתוקף למשך כל זמן הקריאה, כדי למנוע מקוראים אחרים לנעול אותו.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
מקורות נתונים של בייטים שניתנים לקריאה
לגבי מקורות נתונים שמייצגים בייטים, קיימת גרסה מורחבת של מקור הנתונים שאפשר לקרוא, כדי לטפל בבייטים ביעילות, במיוחד על ידי צמצום מספר העותקים. באמצעות שידורי בייטים אפשר לקבל קוראים מסוג 'מביאים את המאגר שלכם' (BYOB). הטמעת ברירת המחדל יכולה לספק מגוון של תפוקות שונות, כמו מחרוזות או מאגרי מערכי ב-WebSockets, בעוד שזרמי בייטים מבטיחים פלט של בייטים. בנוסף, לקוראים של BYOB יש יתרונות יציבות. הסיבה לכך היא שאם מאגר יוצא משימוש, אפשר להבטיח שלא כותבים לאותו מאגר פעמיים, וכך להימנע ממצבים של תחרות. קוראים מסוג BYOB יכולים לצמצם את מספר הפעמים שהדפדפן צריך להריץ איסוף אשפה, כי הוא יכול לעשות שימוש חוזר במאגרים.
יצירת מקור נתונים של בייטים לקריאה
כדי ליצור מקור נתונים של בייטים שאפשר לקרוא, מעבירים פרמטר type
נוסף למבנה ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
המקור הבסיסי של מקור בייט לקריאה מקבל ReadableByteStreamController
כדי לבצע בו מניפולציה. השיטה ReadableByteStreamController.enqueue()
מקבלת ארגומנט chunk
שהערך שלו הוא ArrayBufferView
. המאפיין ReadableByteStreamController.byobRequest
מחזיר את בקשת ה-pull הנוכחית של BYOB, או null אם אין כזו. לבסוף, הנכס ReadableByteStreamController.desiredSize
מחזיר את הגודל הרצוי כדי למלא את התור הפנימי של הסטרימינג המבוקר.
queuingStrategy
הארגומנט השני של ה-constructor של ReadableStream()
, שגם הוא אופציונלי, הוא queuingStrategy
.
זהו אובייקט שמגדיר אופציונלית אסטרטגיית תורים לסטרימינג, עם פרמטר אחד:
highWaterMark
: מספר בייטים לא שלילי שמציין את נקודת השיא של הסטרימינג באמצעות שיטת התור הזו. המאפיין הזה משמש לקביעת לחץ החזרה, שמופיע דרך המאפיין המתאיםReadableByteStreamController.desiredSize
. הוא קובע גם מתי מתבצעת הקריאה לשיטהpull()
של המקור הבסיסי.
השיטות getReader()
ו-read()
לאחר מכן תוכלו לקבל גישה ל-ReadableStreamBYOBReader
על ידי הגדרת הפרמטר mode
בהתאם:
ReadableStream.getReader({ mode: "byob" })
. כך אפשר לשלוט בצורה מדויקת יותר בהקצאת המאגר כדי למנוע העתקות. כדי לקרוא מזרם הבייטים, צריך לבצע קריאה ל-ReadableStreamBYOBReader.read(view)
, כאשר view
הוא ArrayBufferView
.
דוגמת קוד של מחרוזת בייט לקריאה
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
הפונקציה הבאה מחזירה מקורות נתונים של בייטים לקריאה, שמאפשרים קריאה יעילה ללא העתקה של מערך שנוצר באופן אקראי. במקום להשתמש בגודל מקטע מוגדר מראש של 1,024, הוא מנסה למלא את מאגר הנתונים הזמני שסופק על ידי המפתח, ומאפשר שליטה מלאה.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
המנגנון של מקור נתונים לכתיבה
מקור נתונים לכתיבה הוא יעד שבו אפשר לכתוב נתונים, שמיוצגים ב-JavaScript באובייקט WritableStream
. הוא משמש כחפץ מופשט מעל sink בסיסי – sink של קלט/פלט ברמה נמוכה יותר שאליו נכתבים נתונים גולמיים.
הנתונים נכתבים בסטרימינג באמצעות סופר, מקטע אחד בכל פעם. מקטע יכול להופיע במגוון צורות, בדיוק כמו המקטעים בקורא. אתם יכולים להשתמש בכל קוד שתרצו כדי ליצור את הקטעים שיהיו מוכנים לכתיבה. הקודאי והקוד המשויך נקראים מפיק.
כשיוצרים גורם כתיבה ומתחילים לכתוב בזרם (גורם כתיבה פעיל), הוא מונע ממנו. רק גורם אחד יכול לכתוב בזרם שאפשר לכתוב בו בכל פעם. אם רוצים שגורם אחר יתחיל לכתוב בסטרימינג, בדרך כלל צריך לשחרר אותו ואז לצרף אליו גורם כתיבה אחר.
תור פנימי עוקב אחרי הקטעים שנכתבו בסטרימינג אבל עדיין לא עברו עיבוד על ידי היעד הבסיסי.
שיטת תורים היא אובייקט שמגדיר איך מקור נתונים צריך לסמן לחץ חוזר על סמך המצב של התור הפנימי שלו. אסטרטגיית ההמתנה בתור מקצה גודל לכל מקטע, ומשווים את הגודל הכולל של כל המקטעים בתור למספר מסוים שנקרא נקודת הפסגה.
המבנה הסופי נקרא בקר. לכל מקור נתונים לכתיבה יש אמצעי בקרה משויך שמאפשר לשלוט במקור הנתונים (לדוגמה, לבטל אותו).
יצירת זרם לכתיבה
הממשק WritableStream
של Streams API מספק הפשטה רגילה לכתיבה של נתוני סטרימינג ליעד, שנקרא sink. האובייקט הזה כולל לחץ חוזר (backpressure) ויצירת תורים מובנים. כדי ליצור מקור נתונים לכתיבה, צריך להפעיל את ה-constructor שלו, WritableStream()
.
יש לו פרמטר underlyingSink
אופציונלי, שמייצג אובייקט עם שיטות ומאפיינים שמגדירים את האופן שבו מופע המקור יתנהג.
underlyingSink
ה-underlyingSink
יכול לכלול את השיטות האופציונליות הבאות שהוגדרו על ידי המפתחים. הפרמטר controller
שמוענק לחלק מהשיטות הוא WritableStreamDefaultController
.
start(controller)
: ה-method הזה נקרא מיד כשהאובייקט נוצר. התוכן של השיטה הזו צריך לאפשר גישה ל-sink הבסיסי. אם התהליך הזה צריך להתבצע באופן אסינכרוני, הוא יכול להחזיר הבטחה כדי לסמן הצלחה או כישלון.write(chunk, controller)
: השיטה הזו תופעל כשמקטע נתונים חדש (שצוין בפרמטרchunk
) יהיה מוכן לכתיבה בבור הנתונים הבסיסי. הוא יכול להחזיר הבטחה כדי לסמן הצלחה או כישלון של פעולת הכתיבה. השיטה הזו תופעל רק אחרי שפעולות הכתיבה הקודמות הצליחו, אף פעם אחרי שהסטרימינג נסגר או בוטל.close(controller)
: ה-method הזה ייקרא אם האפליקציה תאותת שהיא סיימה לכתוב קטעי קוד לסטרימינג. התוכן צריך לבצע את כל הפעולות הנדרשות כדי לסיים את הכתיבה לבור הנתונים הבסיסי ולשחרר את הגישה אליו. אם התהליך הזה הוא אסינכרוני, הוא יכול להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון. המערכת תקרא לשיטה הזו רק אחרי שכל פעולות הכתיבה שנמצאות בתור יסתיימו בהצלחה.abort(reason)
: ה-method הזה ייקרא אם האפליקציה תאותת שהיא רוצה לסגור את מקור הנתונים באופן פתאומי ולהעביר אותו למצב שגיאה. הוא יכול לנקות משאבים שנשמרו, בדומה ל-close()
, אבלabort()
ייכלל גם אם יש שורות ברשימה של פעולות כתיבה. הקטעים האלה יושמדו. אם התהליך הזה הוא אסינכרוני, הוא יכול להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון. הפרמטרreason
מכיל את הערךDOMString
שמתאר את הסיבה לביטול ההעברה.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
הממשק WritableStreamDefaultController
של Streams API מייצג בקר שמאפשר לשלוט במצב של WritableStream
במהלך ההגדרה, כשמוגשים עוד קטעים לכתיבה או בסוף הכתיבה. כשיוצרים WritableStream
, למכשיר ההטמעה (sink) הבסיסי מוקצה מכונה תואמת של WritableStreamDefaultController
לצורך מניפולציה. ל-WritableStreamDefaultController
יש רק method אחד: WritableStreamDefaultController.error()
, שגורם לשגיאות בכל אינטראקציה עתידית עם הסטרימינג המשויך.
WritableStreamDefaultController
תומך גם במאפיין signal
שמחזיר מופע של AbortSignal
, שמאפשר להפסיק פעולת WritableStream
במקרה הצורך.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
הארגומנט השני של ה-constructor של WritableStream()
, שגם הוא אופציונלי, הוא queuingStrategy
.
זהו אובייקט שמגדיר אופציונלית אסטרטגיית תורים לסטרימינג, עם שני פרמטרים:
highWaterMark
: מספר לא שלילי שמציין את נקודת השיא של הסטרימינג באמצעות שיטת התור הזו.size(chunk)
: פונקציה שמחשבת ומחזירה את הגודל המוגבל והחיובי של ערך הקטע הנתון. התוצאה משמשת לקביעת לחץ החזרה, שמופיע דרך המאפיין המתאיםWritableStreamDefaultWriter.desiredSize
.
השיטות getWriter()
ו-write()
כדי לכתוב בסטרימינג שאפשר לכתוב בו, צריך גורם כתיבה, שהוא WritableStreamDefaultWriter
. השיטה getWriter()
בממשק WritableStream
מחזירה מופע חדש של WritableStreamDefaultWriter
ונועלת את הסטרימינג למופע הזה. בזמן שהסטרימינג נעול, אי אפשר לצרף סופר אחר עד שמשחררים את הסופר הנוכחי.
השיטה write()
של הממשק WritableStreamDefaultWriter
כותבת מקטע נתונים שהועברו ל-WritableStream
ולצינור הניקוז הבסיסי שלו, ואז מחזירה הבטחה שמתבררת כדי לציין אם פעולת הכתיבה הצליחה או נכשלה. חשוב לזכור שהמשמעות של 'הצלחה' תלויה ב-sink הבסיסי. ייתכן שהמשמעות היא שהקטע אושר, ולא בהכרח שהוא נשמר בבטחה ביעד הסופי שלו.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
הנכס locked
כדי לבדוק אם מקור נתונים לכתיבה נעול, אפשר לגשת לנכס WritableStream.locked
שלו.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
דוגמת קוד של מקור נתונים לכתיבה
בדוגמת הקוד שבהמשך מוצגים כל השלבים בפעולה.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
העברה של מקור נתונים לקריאה למקור נתונים לכתיבה
אפשר להעביר באמצעות צינור (pipe) מקור נתונים לקריאה למקור נתונים לכתיבה באמצעות השיטה pipeTo()
של מקור הנתונים לקריאה.
הפונקציה ReadableStream.pipeTo()
מעבירה את ReadableStream
הנוכחי באמצעות צינור ל-WritableStream
נתון, ומחזירה הבטחה שתתבצע כשתהליך ההעברה יושלם בהצלחה, או תידחה אם נתקלנו בשגיאות.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
יצירת מקור נתונים לטראנספורמציה
הממשק TransformStream
של Streams API מייצג קבוצה של נתונים שניתנים לטרנספורמציה. כדי ליצור transform stream, צריך לקרוא למבנה ה-constructor שלו, TransformStream()
, שיוצר אובייקט transform stream מהמפעילים שצוינו ומחזיר אותו. ה-constructor של TransformStream()
מקבל כארגומנט הראשון אובייקט JavaScript אופציונלי שמייצג את transformer
. אובייקטים כאלה יכולים להכיל כל אחת מהשיטות הבאות:
transformer
start(controller)
: ה-method הזה נקרא מיד כשהאובייקט נוצר. בדרך כלל משתמשים באפשרות הזו כדי להוסיף לתור קטעי קידומת באמצעותcontroller.enqueue()
. הקטעים האלה יקראו מהצד הקריא, אבל הם לא תלויים בכתיבה בצד הכתיבה. אם התהליך הראשוני הוא אסינכרוני, למשל כי נדרשת קצת מאמץ כדי לקבל את קטעי התחילית, הפונקציה יכולה להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון. הבטחה שנדחתה תגרום לשגיאה בסטרימינג. כל חריגות שייזרקו יושלחו מחדש על ידי ה-constructor שלTransformStream()
.transform(chunk, controller)
: ה-method הזה נקרא כשמקטע חדש שנכתב במקור בצד הכתיבה מוכן לטרנספורמציה. הטמעת הסטרימינג מבטיחה שהפעלת הפונקציה הזו תתבצע רק אחרי שהטרנספורמציות הקודמות הצליחו, אף פעם לא לפני שהטרנספורמציהstart()
הושלמה או אחרי שהפעלתflush()
. הפונקציה הזו מבצעת את עבודת הטרנספורמציה בפועל של מקור הנתונים המומר. הוא יכול להוסיף את התוצאות לתור באמצעותcontroller.enqueue()
. כך, מקטע אחד שנכתב בצד הכתיבה יכול להוביל לאפס או למספר מקטעים בצד הקריאה, בהתאם למספר הפעמים שבהןcontroller.enqueue()
נקרא. אם התהליך של הטרנספורמציה הוא אסינכרוני, הפונקציה הזו יכולה להחזיר הבטחה (promise) כדי לסמן את ההצלחה או הכישלון של הטרנספורמציה. אם הבטחה תידחה, תופיע שגיאה גם בצד הקריאה וגם בצד הכתיבה של מקור הנתונים לעיבוד. אם לא מציינים שיטתtransform()
, נעשה שימוש בטרנספורמציית הזהות, שמוסיפה לתור קטעי נתונים ללא שינוי מהצד שאפשר לכתוב בו לצד שאפשר לקרוא בו.flush(controller)
: ה-method הזה נקרא אחרי שכל הקטעים שנכתבו בצד הכתיבה עברו טרנספורמציה בהצלחה דרךtransform()
, והצד הכתיבה עומד להיסגר. בדרך כלל משתמשים באפשרות הזו כדי להוסיף לתור קטעי סיומת בצד הקריאה, לפני שהוא נסגר גם כן. אם תהליך השטיפה הוא אסינכרוני, הפונקציה יכולה להחזיר הבטחה כדי לסמן הצלחה או כישלון. התוצאה תימסר למבצע הקריאה שלstream.writable.write()
. בנוסף, אם הבטחה תידחה, תופיע שגיאה גם בצד הקריאה וגם בצד הכתיבה של הסטרימינג. השלכת חריגה נחשבת לאותה פעולה כמו החזרת הבטחה שנדחתה.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
אסטרטגיות התור writableStrategy
ו-readableStrategy
הפרמטרים האופציונליים השני והשלישי של ה-constructor של TransformStream()
הם אסטרטגיות אופציונליות של writableStrategy
ו-readableStrategy
לתור. הם מוגדרים כפי שמתואר בקטעים של זרמים לקריאה ולכתיבה, בהתאמה.
דוגמת קוד לטרנספורמציה של מקור נתונים
דוגמת הקוד הבאה מראה איך פועלת תצוגת מקור פשוטה של טרנספורמציה.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
העברה של מקור נתונים לקריאה דרך מקור נתונים לטרנספורמציה
השיטה pipeThrough()
של ממשק ReadableStream
מספקת דרך לשרשור של העברת הנתונים מהזרם הנוכחי דרך זרם טרנספורמציה או כל צמד אחר שאפשר לכתוב/לקרוא בו. בדרך כלל, העברת נתונים ב-pipe תנעל אותו למשך כל תקופת ההעברה, ותימנע מקוראים אחרים לנעול אותו.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
דוגמת הקוד הבאה (קצת מלאכותית) מראה איך אפשר להטמיע גרסה 'רועשת' של fetch()
שממירה את כל הטקסט לאותיות רישיות על ידי שימוש בהבטחה של התגובה שהוחזרה כזרם והמרת כל מקטע לאותיות רישיות. היתרון של הגישה הזו הוא שאין צורך להמתין להורדה של כל המסמך, וזה יכול לעשות הבדל עצום כשעובדים עם קבצים גדולים.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
הדגמה (דמו)
בהדגמה הבאה מוצגות פעולות של קריאה, כתיבה וטרנספורמציה של מקורות נתונים. הוא כולל גם דוגמאות למחרוזות צינורות של pipeThrough()
ו-pipeTo()
, וגם הדגמה של tee()
. אפשר להריץ את הדמו בחלון נפרד או להציג את קוד המקור.
מקורות מידע מועילים שזמינים בדפדפן
יש מספר מקורות מידע שימושיים שמובנים ישירות בדפדפן. אפשר ליצור בקלות ReadableStream
מ-blob. שיטת stream() של הממשק Blob
מחזירה ReadableStream
, שמחזיר את הנתונים שמכיל ה-blob בזמן הקריאה. חשוב לזכור גם שאובייקט File
הוא סוג ספציפי של Blob
, ואפשר להשתמש בו בכל הקשר שבו אפשר להשתמש ב-blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
הווריאנטים של TextDecoder.decode()
ו-TextEncoder.encode()
לשידור בזמן אמת נקראים TextDecoderStream
ו-TextEncoderStream
, בהתאמה.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
קל לדחוס או לחלץ קובץ באמצעות המרות של CompressionStream
ו-DecompressionStream
, בהתאמה. בדוגמת הקוד הבאה מוסבר איך מורידים את המפרט של Streams, דוחסים אותו (gzip) ישירות בדפדפן וכותבים את הקובץ הדחוס ישירות בדיסק.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
FileSystemWritableFileStream
של File System Access API וfetch()
request streams הניסיוניים הם דוגמאות לזרמים שאפשר לכתוב בהם בעולם האמיתי.
ב-Serial API נעשה שימוש רב בזרמים שאפשר לקרוא ולכתוב בהם.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
לבסוף, ה-API של WebSocketStream
משלב בין סטרימינג ל-WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
משאבים שימושיים
- מפרט Streams
- הדגמות נלוות
- Streams polyfill
- 2016 – השנה של מקורות הנתונים באינטרנט
- מחזורים וגנרטורים אסינכרונים
- Stream Visualizer
תודות
הבדיקה של המאמר בוצעה על ידי Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley ו-Adam Rice. פוסטים בבלוג של Jake Archibald עזרו לי מאוד להבין את הנושא של שידורים. חלק מדגימות הקוד מבוססות על החקירות של משתמש GitHub @bellbind, וחלק מהטקסט מבוסס במידה רבה על מסמכי התיעוד של MDN Web בנושא Streams. המחברים של Streams Standard עשו עבודה נהדרת בכתיבת המפרט הזה. התמונה הראשית היא של Ryan Lara ב-Unsplash.