כאן מוסבר איך להשתמש בזרמי קריאה, כתיבה והמרה באמצעות Streams API.
Streams API מאפשר לכם לגשת באופן פרוגרמטי לזרמי נתונים שהתקבלו ברשת או שנוצרו באמצעים מקומיים, ולעבד אותם באמצעות JavaScript. סטרימינג כולל פירוק של משאב שרוצים לקבל, לשלוח או לשנות לחלקים קטנים, ואז עיבוד של החלקים האלה בזה אחר זה. הדפדפנים ממילא מבצעים סטרימינג כשהם מקבלים נכסים כמו HTML או סרטונים שמוצגים בדפי אינטרנט, אבל האפשרות הזו לא הייתה זמינה ל-JavaScript לפני שנוספה התמיכה בסטרימינג ב-fetch
בשנת 2015.
בעבר, אם רציתם לעבד משאב כלשהו (סרטון, קובץ טקסט וכו'), הייתם צריכים להוריד את הקובץ כולו, לחכות עד שהוא יעבור דה-סריאליזציה לפורמט מתאים ואז לעבד אותו. השינויים האלה מאפשרים להשתמש ב-JavaScript כדי להזרים נתונים. מעכשיו אפשר לעבד נתונים גולמיים באמצעות JavaScript באופן הדרגתי ברגע שהם זמינים בצד הלקוח, בלי ליצור מאגר נתונים זמני, מחרוזת או blob. האפשרות הזו מאפשרת לכם להשתמש במגוון תרחישים, חלקם מפורטים בהמשך:
- אפקטים של וידאו: העברת סטרימינג של וידאו שניתן לקריאה דרך סטרימינג של טרנספורמציה שמחיל אפקטים בזמן אמת.
- דחיסה או ביטול דחיסה של נתונים: העברת זרם קבצים דרך זרם טרנספורמציה שדוחס או מבטל את הדחיסה של הנתונים באופן סלקטיבי.
- פענוח תמונה: העברת זרם של תגובת HTTP דרך זרם טרנספורמציה שמפענח בייטים של
לנתוני מפת סיביות, ואז דרך זרם טרנספורמציה נוסף שמתרגם מפות סיביות ל-PNG. אם
התקנתם את התוסף בתוך
fetch
handler של service worker, הוא מאפשר לכם לבצע polyfill באופן שקוף לפורמטים חדשים של תמונות כמו AVIF.
תמיכה בדפדפנים
ReadableStream ו-WritableStream
TransformStream
מושגי ליבה
לפני שאפרט על הסוגים השונים של הזרמות, אציג כמה מושגי ליבה.
גושים
Chunk הוא חלק נתונים יחיד שנכתב לזרם או נקרא ממנו. הוא יכול להיות מכל סוג, ויכול להיות שזרמים יכילו נתחים מסוגים שונים. ברוב המקרים, נתח לא יהיה היחידה האטומית ביותר של נתונים עבור זרם נתון. לדוגמה, יכול להיות שזרם בייטים יכיל נתחים שמורכבים מיחידות של 16 KiB Uint8Array
, במקום בייטים בודדים.
שידורים שניתנים לקריאה
מקור נתונים לקריאה מייצג מקור נתונים שאפשר לקרוא ממנו. במילים אחרות, הנתונים יוצאים מזרם נתונים שניתן לקריאה. בפועל, זרם קריא הוא מופע של המחלקה ReadableStream
.
מקורות נתונים שאפשר לכתוב בהם
מקור נתונים עם הרשאת כתיבה מייצג יעד לנתונים שאפשר לכתוב בו. במילים אחרות, הנתונים נכנסים למקור נתונים שאפשר לכתוב בו. בפועל, זרם שניתן לכתיבה הוא מופע של המחלקה WritableStream
.
שינוי השידורים
זרם טרנספורמציה מורכב מזוג זרמים: זרם שניתן לכתיבה, שנקרא הצד שניתן לכתיבה, וזרם שניתן לקריאה, שנקרא הצד שניתן לקריאה.
מטאפורה מהעולם האמיתי יכולה להיות מתורגמן סימולטני שמתרגם משפה אחת לשפה אחרת תוך כדי תנועה.
בצורה שספציפית למקור הנתונים של השינוי, כתיבה לצד שניתן לכתיבה גורמת לכך שנתונים חדשים יהיו זמינים לקריאה מהצד שניתן לקריאה. במילים אחרות, כל אובייקט עם מאפיין writable
ומאפיין readable
יכול לשמש כזרם טרנספורמציה. עם זאת, המחלקה הסטנדרטית TransformStream
מקלה על יצירת זוג כזה ששזורים בצורה נכונה.
שרשראות לצינורות
השימוש העיקרי בזרמים הוא העברה שלהם בצינורות אחד לשני. אפשר להעביר נתונים מזרם קריא ישירות לזרם שניתן לכתיבה באמצעות השיטה pipeTo()
של הזרם הקריא, או להעביר אותם דרך זרם טרנספורמציה אחד או יותר באמצעות השיטה pipeThrough()
של הזרם הקריא. קבוצה של זרמי נתונים שמחוברים יחד באמצעות צינורות נקראת שרשרת צינורות.
לחץ חוזר
אחרי שיוצרים שרשרת צינורות, היא מעבירה אותות לגבי המהירות שבה חתיכות צריכות לעבור דרכה. אם שלב כלשהו בשרשרת עדיין לא יכול לקבל נתחים, הוא מעביר אות אחורה דרך שרשרת הצינורות, עד שבסופו של דבר המקור המקורי מקבל הוראה להפסיק לייצר נתחים כל כך מהר. התהליך הזה של נרמול הזרימה נקרא לחץ חוזר.
הנחת כדור הגולף על טי
אפשר להשתמש בשיטה tee()
כדי לפצל זרם קריא (שנקרא כך על שם הצורה של האות T גדולה).
הפעולה הזו תנעל את הזרם, כלומר לא יהיה יותר אפשר להשתמש בו ישירות. עם זאת, היא תיצור שני זרמים חדשים, שנקראים ענפים, שאפשר להשתמש בהם בנפרד.
ההסתעפות חשובה גם כי אי אפשר להריץ אחורה או להפעיל מחדש את הסטרימינג. נרחיב על כך בהמשך.
איך עובד עדכון תוכן שאפשר לקרוא
מקור נתונים שניתן לקריאה מיוצג ב-JavaScript על ידי אובייקט ReadableStream
שזורם ממקור בסיסי. ה-constructor ReadableStream()
יוצר ומחזיר אובייקט של זרם קריא מהמטפלים שצוינו. יש שני סוגים של מקורות בסיסיים:
- מקורות מסוג Push שולחים לכם נתונים כל הזמן כשאתם ניגשים אליהם, ואתם יכולים להתחיל, להשהות או לבטל את הגישה למקור הנתונים. לדוגמה, שידורי וידאו חיים, אירועים שנשלחים מהשרת או WebSockets.
- מקורות משיכה מחייבים אתכם לבקש מהם נתונים באופן מפורש אחרי שמתחברים אליהם. דוגמאות כוללות פעולות HTTP באמצעות קריאות
fetch()
אוXMLHttpRequest
.
הנתונים ממקורות נתונים מסוג Stream נקראים ברצף בחלקים קטנים שנקראים chunks. אומרים שהחלקים שמוצבים בסטרימינג נוספים לתור. כלומר, הם ממתינים בתור לקריאה. תור פנימי עוקב אחרי החלקים שעדיין לא נקראו.
שיטת תור היא אובייקט שקובע איך הזרם צריך לסמן לחץ חוזר על סמך המצב של התור הפנימי שלו. במסגרת אסטרטגיית התור, מוקצה גודל לכל נתח, והגודל הכולל של כל הנתחים בתור מושווה למספר ספציפי, שנקרא high water mark.
הקטעים בתוך הזרם נקראים על ידי קורא. הקורא הזה מאחזר את הנתונים במנות, ומאפשר לכם לבצע עליהם כל פעולה שתרצו. הקורא בתוספת קוד העיבוד האחר שמשויך אליו נקרא צרכן.
המונח הבא בהקשר הזה נקרא בקר. לכל זרם קריא יש בקר משויך, שמאפשר לכם לשלוט בזרם, כפי שהשם שלו מרמז.
רק קורא אחד יכול לקרוא נתונים מזרם בכל פעם. כשיוצרים קורא והוא מתחיל לקרוא נתונים מזרם (כלומר, הוא הופך לקורא פעיל), הוא ננעל לזרם הזה. אם רוצים שקורא אחר יקרא את הנתונים מהזרם, בדרך כלל צריך לשחרר את הקורא הראשון לפני שעושים משהו אחר (אבל אפשר לפצל זרמים).
יצירת סטרימינג שניתן לקריאה
כדי ליצור מקור נתונים לקריאה, קוראים לבונה שלו ReadableStream()
.
ל-constructor יש ארגומנט אופציונלי underlyingSource
, שמייצג אובייקט עם מתודות ומאפיינים שמגדירים את אופן הפעולה של מופע הסטרים שנוצר.
underlyingSource
אפשר להשתמש בשיטות האופציונליות הבאות שמוגדרות על ידי המפתח:
-
start(controller)
: מופעל באופן מיידי כשהאובייקט נוצר. השיטה יכולה לגשת למקור הסטרימינג ולבצע כל פעולה אחרת שנדרשת להגדרת פונקציונליות הסטרימינג. אם התהליך הזה צריך להתבצע באופן אסינכרוני, השיטה יכולה להחזיר הבטחה כדי לסמן הצלחה או כישלון. הפרמטרcontroller
שמועבר לשיטה הזו הואReadableStreamDefaultController
. -
pull(controller)
: אפשר להשתמש בפרמטר הזה כדי לשלוט בסטרימינג כשמאחזרים עוד נתונים. הפונקציה הזו מופעלת שוב ושוב כל עוד התור הפנימי של חלקי הנתונים בזרם לא מלא, עד שהתור מגיע לסימן הגבול העליון שלו. אם התוצאה של הקריאה ל-pull()
היא הבטחה, לא תתבצע קריאה נוספת ל-pull()
עד שההבטחה תתממש. אם ההבטחה נדחית, הסטרים יקבל שגיאה. -
cancel(reason)
: נקראת כשצרכן הנתונים של הזרם מבטל את הזרם.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
הפונקציה ReadableStreamDefaultController
תומכת בשיטות הבאות:
-
ReadableStreamDefaultController.close()
סוגר את הזרם המשויך. -
ReadableStreamDefaultController.enqueue()
מוסיפה נתח נתון לתור בזרם המשויך. ReadableStreamDefaultController.error()
גורם לשגיאה בכל האינטראקציות העתידיות עם הסטרימינג המשויך.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
הארגומנט השני של הקונסטרוקטור ReadableStream()
, שהוא אופציונלי, הוא queuingStrategy
.
זהו אובייקט שאפשר להגדיר בו אסטרטגיית תור להפעלת הסטרימינג. האובייקט מקבל שני פרמטרים:
-
highWaterMark
: מספר לא שלילי שמציין את נקודת השיא של הזרם באמצעות אסטרטגיית התור הזו. -
size(chunk)
: פונקציה שמחשבת ומחזירה את הגודל הסופי הלא שלילי של ערך הנתח הנתון. התוצאה משמשת לקביעת לחץ חוזר, שמוצג באמצעות המאפיין המתאיםReadableStreamDefaultController.desiredSize
. היא גם קובעת מתי מתבצעת קריאה לשיטהpull()
של המקור הבסיסי.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
השיטות getReader()
ו-read()
כדי לקרוא מזרם שניתן לקריאה, צריך קורא, שהוא ReadableStreamDefaultReader
.
השיטה getReader()
של הממשק ReadableStream
יוצרת קורא ונועלת את הזרם עבורו. בזמן שהזרם נעול, אי אפשר להשיג קורא אחר עד שהקורא הנוכחי ישוחרר.
השיטה read()
של הממשק ReadableStreamDefaultReader
מחזירה הבטחה שמאפשרת גישה לחלק הבא בתור הפנימי של הזרם. היא מחזירה תוצאה בהתאם למצב הזרם. אלה האפשרויות השונות:
- אם יש נתח זמין, ההבטחה תתממש עם אובייקט מהצורה
{ value: chunk, done: false }
. - אם הזרם ייסגר, ההבטחה תתממש עם אובייקט מהצורה
{ value: undefined, done: true }
. - אם הסטרימינג ייכשל, ההבטחה תידחה עם השגיאה הרלוונטית.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
הנכס locked
כדי לבדוק אם סטרימינג שניתן לקריאה נעול, אפשר לגשת למאפיין ReadableStream.locked
שלו.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
דוגמאות קוד של זרם נתונים שניתן לקריאה
בדוגמה הבאה של קוד אפשר לראות את כל השלבים בפעולה. קודם יוצרים ReadableStream
שבארגומנט underlyingSource
שלו (כלומר, המחלקה TimestampSource
) מוגדרת שיטת start()
.
בשיטה הזו, המערכת של controller
בשידור החי enqueue()
חותמת זמן כל שנייה במשך עשר שניות.
לבסוף, הוא אומר לבקר close()
את הזרם. כדי להשתמש בשידור הזה, יוצרים קורא באמצעות ה-method getReader()
וקוראים ל-method read()
עד שהשידור done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
איטרציה אסינכרונית
יכול להיות ש-API שבודק בכל איטרציה של לולאת read()
אם הסטרים הוא done
לא יהיה הכי נוח.
למזלנו, בקרוב תהיה דרך טובה יותר לעשות את זה: איטרציה אסינכרונית.
for await (const chunk of stream) {
console.log(chunk);
}
פתרון עקיף לשימוש באיטרציה אסינכרונית היום הוא הטמעה של ההתנהגות באמצעות polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
הפניית זרם נתונים לקריאה
השיטה tee()
של הממשק ReadableStream
מכינה את הזרם הנוכחי שניתן לקריאה, ומחזירה מערך עם שני רכיבים שמכיל את שני הענפים שנוצרו כמופעים חדשים של ReadableStream
. כך שני קוראים יכולים לקרוא נתונים מזרם בו-זמנית. לדוגמה, אפשר לעשות את זה בקובץ service worker אם רוצים לאחזר תגובה מהשרת ולהזרים אותה לדפדפן, אבל גם להזרים אותה למטמון של קובץ ה-service worker. אי אפשר להשתמש בגוף התגובה יותר מפעם אחת, ולכן צריך שני עותקים. כדי לבטל את הזרם, צריך לבטל את שני הענפים שנוצרו. בדרך כלל, כשמכינים זרם לשידור, הוא ננעל למשך השידור כדי למנוע מקוראים אחרים לנעול אותו.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
זרמי בייטים שניתנים לקריאה
לזרמים שמייצגים בייטים, מסופקת גרסה מורחבת של הזרם שניתן לקריאה כדי לטפל בבייטים בצורה יעילה, במיוחד על ידי צמצום העותקים. זרמי בייטים מאפשרים להשיג קוראים עם מאגר משלהם (BYOB). ההטמעה שמוגדרת כברירת מחדל יכולה לתת מגוון פלטים שונים, כמו מחרוזות או מאגרי מערכים במקרה של WebSockets, בעוד שזרמי בייטים מבטיחים פלט בייטים. בנוסף, קוראים שמביאים את הדפדפן שלהם נהנים מיתרונות של יציבות. הסיבה לכך היא שאם מאגר נתונים זמני מנותק, אפשר להבטיח שלא תתבצע כתיבה לאותו מאגר נתונים זמני פעמיים, וכך נמנעים מצבי מירוץ. קוראי BYOB יכולים להפחית את מספר הפעמים שהדפדפן צריך לפעול איסוף אשפה, מכיוון שהוא יכול לעשות שימוש חוזר במאגרים.
יצירת זרם בייטים שניתן לקריאה
אפשר ליצור זרם בייטים שניתן לקריאה על ידי העברת פרמטר נוסף של type
אל בנאי ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
המקור הבסיסי של זרם בייטים שניתן לקריאה מקבל ReadableByteStreamController
כדי לבצע בו מניפולציה. השיטה ReadableByteStreamController.enqueue()
שלה מקבלת ארגומנט chunk
שהערך שלו הוא ArrayBufferView
. המאפיין ReadableByteStreamController.byobRequest
מחזיר את בקשת המשיכה הנוכחית של BYOB, או null אם אין כזו. לבסוף, המאפיין ReadableByteStreamController.desiredSize
מחזיר את הגודל הרצוי למילוי התור הפנימי של הזרם המבוקר.
queuingStrategy
הארגומנט השני של הקונסטרוקטור ReadableStream()
, שהוא אופציונלי, הוא queuingStrategy
.
זהו אובייקט שאופציונלית מגדיר אסטרטגיית תור לסטרימינג, שמקבל פרמטר אחד:
-
highWaterMark
: מספר לא שלילי של בייטים שמציין את נקודת השיא של הזרם באמצעות אסטרטגיית התור הזו. הערך הזה משמש לקביעת לחץ חוזר, שמוצג באמצעות המאפיין המתאיםReadableByteStreamController.desiredSize
. היא גם קובעת מתי מתבצעת קריאה לשיטהpull()
של המקור הבסיסי.
השיטות getReader()
ו-read()
לאחר מכן תוכלו לקבל גישה ל-ReadableStreamBYOBReader
על ידי הגדרת הפרמטר mode
בהתאם:
ReadableStream.getReader({ mode: "byob" })
. כך אפשר לשלוט בצורה מדויקת יותר בהקצאת המאגר כדי להימנע מהעתקות. כדי לקרוא מזרם הבייטים, צריך להתקשר אל ReadableStreamBYOBReader.read(view)
, כאשר view
הוא ArrayBufferView
.
דוגמת קוד של זרם בייטים שניתן לקריאה
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
הפונקציה הבאה מחזירה זרמי בייטים שניתנים לקריאה, ומאפשרת קריאה יעילה של מערך שנוצר באופן אקראי ללא העתקה. במקום להשתמש בגודל נתח שנקבע מראש של 1,024, הוא מנסה למלא את המאגר שסופק על ידי המפתח, וכך מאפשר שליטה מלאה.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
המכניקה של זרם שניתן לכתיבה
זרם שאפשר לכתוב בו הוא יעד שאפשר לכתוב בו נתונים, שמיוצג ב-JavaScript על ידי אובייקט WritableStream
. השכבה הזו משמשת כהפשטה מעל מאגר נתונים בסיסי – מאגר קלט/פלט ברמה נמוכה יותר, שאליו נכתבים נתונים גולמיים.
הנתונים נכתבים למקור הנתונים באמצעות writer, מקטע אחד בכל פעם. נתח יכול להופיע במגוון צורות, כמו הנתחים בקורא. אתם יכולים להשתמש בכל קוד שתרצו כדי ליצור את החלקים שמוכנים לכתיבה. הכותב והקוד המשויך נקראים יוצר.
כשיוצרים רכיב writer והוא מתחיל לכתוב לזרם (רכיב writer פעיל), אומרים שהוא נעול לזרם. רק כותב אחד יכול לכתוב לזרם שניתן לכתיבה בכל פעם. אם רוצים שכותב אחר יתחיל לכתוב לזרם, בדרך כלל צריך לשחרר אותו לפני שמצרפים אליו כותב אחר.
תור פנימי עוקב אחרי נתחי הנתונים שנכתבו לזרם אבל עדיין לא עברו עיבוד על ידי יעד הבסיס.
שיטת תור היא אובייקט שקובע איך הזרם צריך לסמן לחץ חוזר על סמך המצב של התור הפנימי שלו. במסגרת אסטרטגיית התור, מוקצה גודל לכל נתח, והגודל הכולל של כל הנתחים בתור מושווה למספר ספציפי, שנקרא high water mark.
המבנה הסופי נקרא בקר. לכל זרם שניתן לכתיבה משויך בקר שמאפשר לכם לשלוט בזרם (לדוגמה, לבטל אותו).
יצירת זרם שניתן לכתיבה
ממשק WritableStream
של Streams API מספק הפשטה סטנדרטית לכתיבת נתונים זורמים ליעד, שנקרא sink. האובייקט הזה מגיע עם לחץ חוזר ועם תור מובנים. כדי ליצור זרם שאפשר לכתוב בו, קוראים לבונה שלו WritableStream()
.
יש לו פרמטר אופציונלי underlyingSink
שמייצג אובייקט עם שיטות ומאפיינים שמגדירים את אופן הפעולה של מופע הסטרים שנוצר.
underlyingSink
ה-underlyingSink
יכול לכלול את השיטות האופציונליות הבאות שמוגדרות על ידי המפתחים. הפרמטר controller
שמועבר לחלק מהשיטות הוא WritableStreamDefaultController
.
-
start(controller)
: המערכת קוראת לשיטה הזו באופן מיידי כשהאובייקט נוצר. התוכן של השיטה הזו צריך להיות מכוון לקבלת גישה למאגר הבסיסי. אם התהליך הזה צריך להתבצע באופן אסינכרוני, הוא יכול להחזיר הבטחה כדי לסמן הצלחה או כישלון. -
write(chunk, controller)
: השיטה הזו תופעל כשנתח חדש של נתונים (שצוין בפרמטרchunk
) יהיה מוכן לכתיבה למאגר הבסיסי. היא יכולה להחזיר הבטחה כדי לציין הצלחה או כישלון של פעולת הכתיבה. השיטה הזו תיקרא רק אחרי שפעולות הכתיבה הקודמות יצליחו, ואף פעם לא אחרי שהזרם ייסגר או יבוטל. -
close(controller)
: הקריאה ל-method הזו תתבצע אם האפליקציה תאותת שהיא סיימה לכתוב נתונים לחלקים בזרם. התוכן צריך לבצע את כל הפעולות שנדרשות כדי לסיים את הכתיבה למאגר הבסיסי ולשחרר את הגישה אליו. אם התהליך הזה הוא אסינכרוני, הוא יכול להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון. השיטה הזו תיקרא רק אחרי שכל הפעולות של כתיבה בתור יסתיימו בהצלחה. -
abort(reason)
: קוד ה-method הזה יופעל אם האפליקציה תאותת שהיא רוצה לסגור את הסטרימינג באופן פתאומי ולהעביר אותו למצב שגיאה. היא יכולה לנקות משאבים מושהים, בדומה ל-close()
, אבלabort()
תיקרא גם אם פעולות כתיבה ממתינות בתור. החלקים האלה יימחקו. אם התהליך הזה הוא אסינכרוני, הוא יכול להחזיר הבטחה כדי לסמן הצלחה או כשלון. הפרמטרreason
מכילDOMString
שמתאר למה הופסקה ההעברה.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
ממשק WritableStreamDefaultController
של Streams API מייצג בקר שמאפשר שליטה במצב של WritableStream
במהלך ההגדרה, כשמוסיפים עוד נתונים לכתיבה או בסיום הכתיבה. כשיוצרים WritableStream
, יעד הבסיס מקבל מופע WritableStreamDefaultController
תואם לשינוי. ל-WritableStreamDefaultController
יש רק method אחד:
WritableStreamDefaultController.error()
,
שגורם לשגיאה בכל האינטראקציות העתידיות עם הסטרים המשויך.
WritableStreamDefaultController
תומך גם במאפיין signal
שמחזיר מופע של AbortSignal
, ומאפשר לעצור פעולה של WritableStream
אם צריך.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
הארגומנט השני של הקונסטרוקטור WritableStream()
, שהוא אופציונלי, הוא queuingStrategy
.
זהו אובייקט שאפשר להגדיר בו אסטרטגיית תור להפעלת הסטרימינג. האובייקט מקבל שני פרמטרים:
-
highWaterMark
: מספר לא שלילי שמציין את נקודת השיא של הזרם באמצעות אסטרטגיית התור הזו. -
size(chunk)
: פונקציה שמחשבת ומחזירה את הגודל הסופי הלא שלילי של ערך הנתח הנתון. התוצאה משמשת לקביעת לחץ חוזר, שמוצג באמצעות המאפיין המתאיםWritableStreamDefaultWriter.desiredSize
.
השיטות getWriter()
ו-write()
כדי לכתוב לזרם שאפשר לכתוב בו, צריך writer, שהוא WritableStreamDefaultWriter
. השיטה getWriter()
של הממשק WritableStream
מחזירה מופע חדש של WritableStreamDefaultWriter
ונועלת את הזרם לאותו מופע. בזמן שהסטרים נעול, אף כותב אחר לא יכול לקבל גישה אליו עד שהכותב הנוכחי ישחרר אותו.
השיטה write()
של הממשק WritableStreamDefaultWriter
כותבת נתונים שעברו לחלק WritableStream
ולמקור הבסיסי שלו, ואז מחזירה הבטחה שמובילה לציון ההצלחה או הכישלון של פעולת הכתיבה. שימו לב: המשמעות של 'הצלחה' תלויה ביעד הבסיסי. יכול להיות שהיא מציינת שהנתונים התקבלו, ולא בהכרח שהם נשמרו בבטחה ביעד הסופי.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
הנכס locked
כדי לבדוק אם זרם שאפשר לכתוב בו נעול, אפשר לגשת למאפיין WritableStream.locked
שלו.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
דוגמת קוד של זרם שניתן לכתיבה
בדוגמה הבאה של קוד אפשר לראות את כל השלבים בפעולה.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
העברת נתונים מזרם שאפשר לקרוא ממנו לזרם שאפשר לכתוב בו
אפשר להעביר נתונים מזרם קריאה לזרם כתיבה באמצעות השיטה pipeTo()
של זרם הקריאה.
הפונקציה ReadableStream.pipeTo()
מעבירה את ה-ReadableStream
הנוכחי ל-WritableStream
נתון ומחזירה הבטחה שמתקיימת כשתהליך ההעברה מסתיים בהצלחה, או נדחית אם מתגלות שגיאות.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
יצירת זרם טרנספורמציה
ממשק TransformStream
של Streams API מייצג קבוצה של נתונים שניתנים לשינוי. יוצרים זרם טרנספורמציה על ידי קריאה לבונה שלו TransformStream()
, שיוצר ומחזיר אובייקט של זרם טרנספורמציה מהמטפלים שצוינו. ה-constructor TransformStream()
מקבל כארגומנט הראשון שלו אובייקט JavaScript אופציונלי שמייצג את transformer
. אובייקטים כאלה יכולים להכיל את השיטות הבאות:
transformer
-
start(controller)
: המערכת קוראת לשיטה הזו באופן מיידי כשהאובייקט נוצר. בדרך כלל, משתמשים בשיטה הזו כדי להוסיף לתור נתחים של קידומות, באמצעותcontroller.enqueue()
. החלקים האלה ייקראו מהצד שניתן לקריאה, אבל הם לא תלויים בכתיבה לצד שניתן לכתיבה. אם התהליך הראשוני הזה הוא אסינכרוני, למשל כי נדרש מאמץ מסוים כדי להשיג את נתחי התחילית, הפונקציה יכולה להחזיר הבטחה כדי לסמן הצלחה או כישלון. הבטחה שנדחתה תגרום לשגיאה בזרם. כל החריגים שמועברים יועברו מחדש על ידי הבונהTransformStream()
. -
transform(chunk, controller)
: קוראים ל-method הזה כשנתח חדש שנכתב במקור בצד שניתן לכתיבה מוכן להמרה. ההטמעה של הסטרימינג מבטיחה שהפונקציה הזו תיקרא רק אחרי שהטרנספורמציות הקודמות יסתיימו בהצלחה, ואף פעם לפני ש-start()
יסתיים או אחרי ש-flush()
ייקרא. הפונקציה הזו מבצעת את העבודה בפועל של שינוי הנתונים בזרם. אפשר להוסיף את התוצאות לתור באמצעותcontroller.enqueue()
. כך, נתח יחיד שנכתב בצד שניתן לכתיבה יכול להניב אפס או כמה נתחים בצד שניתן לקריאה, בהתאם למספר הפעמים שבהן מתבצעת הקריאה ל-controller.enqueue()
. אם תהליך ההמרה הוא אסינכרוני, הפונקציה הזו יכולה להחזיר הבטחה כדי לסמן הצלחה או כישלון של ההמרה. אם ההבטחה נדחית, תהיה שגיאה גם בצדדים שניתן לקרוא ולכתוב של זרם ההמרה. אם לא מסופקת שיטתtransform()
, נעשה שימוש בשינוי הזהות, שמעביר את החלקים ללא שינוי מהצד שניתן לכתיבה לצד שניתן לקריאה. -
flush(controller)
: הקריאה ל-method הזו מתבצעת אחרי שכל הנתונים שנכתבו בצד שניתן לכתיבה עברו טרנספורמציה בהצלחה דרךtransform()
, ולפני שהצד שניתן לכתיבה נסגר. בדרך כלל משתמשים בשיטה הזו כדי להוסיף לחלק הקריא נתחים של סיומות לתור, לפני שגם הוא נסגר. אם תהליך הניקוי הוא אסינכרוני, הפונקציה יכולה להחזיר הבטחה כדי לציין הצלחה או כשלון. התוצאה תועבר למי שקורא לפונקציהstream.writable.write()
. בנוסף, הבטחה שנדחתה תגרום לשגיאה גם בצד הקריאה וגם בצד הכתיבה של הזרם. העלאת חריגה נחשבת זהה להחזרת הבטחה שנדחתה.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
אסטרטגיות התור writableStrategy
ו-readableStrategy
הפרמטרים השני והשלישי האופציונליים של בנאי TransformStream()
הם אופציונליים
writableStrategy
וreadableStrategy
אסטרטגיות להוספה לתור. הם מוגדרים כמו שמתואר בקטעים readable ו-writable של הזרם, בהתאמה.
דוגמת קוד לשינוי של זרם
בדוגמת הקוד הבאה אפשר לראות זרם טרנספורמציה פשוט בפעולה.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
העברת נתונים מזרם שניתן לקריאה לזרם של טרנספורמציה
השיטה pipeThrough()
של הממשק ReadableStream
מספקת דרך שניתן לשרשר בה את הזרם הנוכחי דרך זרם טרנספורמציה או כל צמד אחר של זרם שניתן לכתיבה ולקריאה. העברת נתונים בצינור בדרך כלל נועלת את הצינור למשך ההעברה, ומונעת מקוראים אחרים לנעול אותו.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
בדוגמת הקוד הבאה (קצת מורכבת) אפשר לראות איך להטמיע גרסה של fetch()
שבה כל הטקסט מופיע באותיות רישיות. כדי לעשות את זה, צריך להשתמש בהבטחה לתגובה שמוחזרת כזרם ולהפוך כל נתח לאותיות רישיות. היתרון בגישה הזו הוא שלא צריך לחכות עד שהמסמך כולו יורד, וזה יכול להיות משמעותי כשמדובר בקבצים גדולים.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
הדגמה (דמו)
בהדגמה שלמטה אפשר לראות זרמי נתונים שניתנים לקריאה, לכתיבה ולשינוי בפעולה. הוא כולל גם דוגמאות לשרשראות של צינורות pipeThrough()
ו-pipeTo()
, ומדגים גם את tee()
. אפשר להריץ את ההדגמה בחלון נפרד או להציג את קוד המקור.
זרמים שימושיים שזמינים בדפדפן
יש מספר שידורים שימושיים שמוטמעים ישירות בדפדפן. אפשר ליצור בקלות ReadableStream
מ-blob. השיטה stream() של הממשק Blob
מחזירה ReadableStream
, שכשקוראים אותו הוא מחזיר את הנתונים שכלולים ב-blob. כדאי גם לזכור שאובייקט File
הוא סוג ספציפי של Blob
, ואפשר להשתמש בו בכל הקשר שבו אפשר להשתמש ב-blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
גרסאות הסטרימינג של TextDecoder.decode()
ו-TextEncoder.encode()
נקראות TextDecoderStream
ו-TextEncoderStream
בהתאמה.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
קל לדחוס או לחלץ קובץ באמצעות זרמי השינוי CompressionStream
ו-DecompressionStream
בהתאמה. בדוגמת הקוד שלמטה אפשר לראות איך להוריד את מפרט ה-Streams, לדחוס אותו (gzip) ישירות בדפדפן ולכתוב את הקובץ הדחוס ישירות לדיסק.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
File System Access API, FileSystemWritableFileStream
וfetch()
request streams הניסיוניים הם דוגמאות לזרמי נתונים שאפשר לכתוב אליהם.
ב-Serial API נעשה שימוש רב בסטרימינג לקריאה ולכתיבה.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
לבסוף, ה-API WebSocketStream
משלב סטרימינג עם WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
משאבים שימושיים
- מפרט פיד הזרמים
- הדגמות נלוות
- Streams polyfill
- 2016 – השנה של מקורות נתונים באינטרנט
- איטרטורים וגנרטורים אסינכרוניים
- Stream Visualizer
תודות
המאמר הזה נבדק על ידי ג'ייק ארצ'יבלד, פרנסואה בופור, סם דאטון, מתיאס בולנס, Surma, ג'ו מדלי ואדם רייס. הפוסטים בבלוג של Jake Archibald עזרו לי מאוד להבין את המושג streams. חלק מדוגמאות הקוד מבוססות על מחקרים של משתמש GitHub @bellbind, וחלקים מהטקסט מבוססים על MDN Web Docs on Streams. היוצרים של Streams Standard עשו עבודה מצוינת בכתיבת המפרט הזה. התמונה הראשית (Hero) היא של Ryan Lara ב-Unsplash.