تعرَّف على كيفية استخدام مصادر البيانات القابلة للقراءة والكتابة والتحويل باستخدام واجهة برمجة التطبيقات Streams API.
تتيح لك واجهة برمجة التطبيقات Streams API الوصول آليًا إلى مصادر البيانات التي يتم تلقّيها عبر الشبكة
أو التي يتم إنشاؤها بأي وسيلة على الجهاز، ثم معالجتها باستخدام JavaScript. يتضمن البث تقسيم مورد تريد تلقّيه أو إرساله أو تحويله
إلى أجزاء صغيرة، ثم معالجة هذه الأجزاء تدريجيًا. على الرغم من أنّ البث المباشر هو ميزة توفّرها المتصفّحات بشكلٍ تلقائي عند تلقّي مواد عرض مثل HTML أو الفيديوهات لعرضها على صفحات الويب، لم تكن هذه الميزة متاحة مطلقًا في JavaScript قبل أن يتم طرح fetch
مع أحداث البث في عام 2015.
في السابق، إذا أردت معالجة مورد من نوع ما (سواء كان فيديو أو ملف نصي أو غير ذلك)، كان عليك تنزيل الملف بأكمله والانتظار إلى أن يتم تحويله إلى تنسيق مناسب، ثم معالجته. مع توفّر أحداث البث لسمة JavaScript، سيتغيّر كل ذلك. يمكنك الآن معالجة البيانات الأولية باستخدام JavaScript بشكل تدريجي بمجرد توفّرها على العميل، بدون الحاجة إلى إنشاء ذاكرة تخزين مؤقت أو سلسلة أو ملف نصي. يتيح ذلك عددًا من حالات الاستخدام، ونذكر بعضها أدناه:
- تأثيرات الفيديو: يتم توجيه بث فيديو قابل للقراءة من خلال بث تحويل يطبّق تأثيرات في الوقت الفعلي.
- (فك) ضغط البيانات: يتم توجيه مصدر ملفات من خلال مصدر تحويل (فك) يضغطه ويفك ضغطه بشكل انتقائي.
- فك ترميز الصور: يتم توجيه تدفق استجابة HTTP من خلال تدفق تحويل يفك ترميز البايتات ويحوّلها إلى بيانات مخطّط بياني، ثم من خلال تدفق تحويل آخر يحوّل المخطّطات البيانية إلى ملفات بتنسيق PNG. في حال
التثبيت داخل معالِج
fetch
لأحد موظّفي الخدمة، يتيح لك ذلك استخدام polyfill بشكل شفاف لتنسيقات الصور الجديدة، مثل AVIF.
دعم المتصفح
ReadableStream وWritableStream
TransformStream
المفاهيم الأساسية
قبل أن أقدّم لك تفاصيل عن الأنواع المختلفة من أحداث البث، سأقدّم لك بعض المفاهيم الأساسية.
أجزاء
القطعة هي جزء واحد من البيانات يتم كتابته في مصدر بيانات أو قراءته منه. يمكن أن يكون من أي نوع، ويمكن أن تحتوي أحداث البث على أجزاء من أنواع مختلفة. في معظم الأحيان، لن تكون المجموعة هي أصدق
وحدة بيانات لبث معيّن. على سبيل المثال، قد يحتوي بث البايتات على أجزاء تتألف من 16
وحدة Uint8Array
كيلوبايت، بدلاً من وحدات بايت فردية.
مصادر البيانات القابلة للقراءة
يمثّل مصدر البيانات القابل للقراءة مصدر بيانات يمكنك القراءة منه. بعبارة أخرى، تأتي البيانات من مصدر بيانات قابل للقراءة. على وجه التحديد، بث قابل للقراءة هو مثيل لفئة ReadableStream
.
مصادر البيانات القابلة للكتابة
يمثّل مصدر البيانات القابل للكتابة وجهة للبيانات التي يمكنك الكتابة فيها. بعبارة أخرى، يتم إدخال البيانات في مصدر قابل للكتابة. على وجه التحديد، تيار قابل للكتابة هو مثيل لفئة
WritableStream
.
تحويل أحداث البث
يتكوّن مسار التحويل من زوج من المسارات: مسار قابل للكتابة، يُعرف باسم الجانب القابل للكتابة،
ومسار قابل للقراءة، يُعرف باسم الجانب القابل للقراءة.
يمكن تشبيه هذه الميزة بأحد المترجمين الفوريين الذين يترجمون من لغة إلى أخرى أثناء التحدث.
بطريقة خاصة بمصدر البيانات المخصّص للتحويل، تؤدّي الكتابة
إلى الجانب القابل للكتابة إلى إتاحة بيانات جديدة للقراءة من
الجانب القابل للقراءة. على وجه التحديد، يمكن لأي عنصر يتضمّن سمة writable
وسمة readable
أن يُستخدَم
كمجموعة بث تحويل. ومع ذلك، تسهِّل فئة TransformStream
العادية إنشاء
مثل هذا الزوج المتشابك بشكل صحيح.
سلاسل الأنابيب
يتم استخدام مصادر البيانات بشكل أساسي من خلال توجيهها إلى بعضها. يمكن توجيه بث قابل للقراءة مباشرةً
إلى بث قابل للكتابة، وذلك باستخدام الطريقة pipeTo()
للبث القابل للقراءة، أو يمكن توجيهه من خلال بث واحد
أو أكثر من عمليات التحويل أولاً، وذلك باستخدام الطريقة pipeThrough()
للبث القابل للقراءة. ويُطلق على مجموعة من
البثّات التي تم ربطها معًا بهذه الطريقة اسم سلسلة البث.
الضغط الخلفي
بعد إنشاء سلسلة قنوات، ستنشر إشارات بشأن سرعة تدفق الأجزاء من خلالها. إذا لم تتمكّن أي خطوة في السلسلة من قبول أجزاء بعد، يتم نشر إشارة للخلف من خلال سلسلة الأنابيب، إلى أن يتم في النهاية إبلاغ المصدر الأصلي بإيقاف إنتاج الأجزاء بهذه السرعة. تُعرف عملية تطبيع التدفق هذه باسم الضغط الخلفي.
Teeing
يمكن بدء بث قابل للقراءة (يُسمى هذا الإجراء باسم شكل الحرف الكبير "T") باستخدام طريقة tee()
.
سيؤدي ذلك إلى قفل البث، أي لن يعود قابلاً للاستخدام مباشرةً، ولكن سيؤدي إلى إنشاء سلسلتَي بث
جديدتَين، تُعرفان باسم الفروع، ويمكن استخدامهما بشكل مستقل.
من المهم أيضًا ضبط البث مسبقًا لأنّه لا يمكن ترجيع البث أو إعادة تشغيله. وسنوضّح لك المزيد من المعلومات حول هذا الموضوع لاحقًا.
آليات البث القابل للقراءة
مصدر البيانات القابل للقراءة هو مصدر بيانات يتم تمثيله في JavaScript من خلال عنصر
ReadableStream
الذي
يتدفق من مصدر أساسي. تنشئ الدالة الإنشائية
ReadableStream()
كائن بث قابل للقراءة من معالِجات معيّنة، وتعرضه. هناك نوعان
من المصادر الأساسية:
- تُرسِل مصادر الإرسال البيانات إليك باستمرار عند الوصول إليها، ويكون بإمكانك بدء البث أو إيقافه مؤقتًا أو إلغاء الوصول إليه. وتشمل الأمثلة أحداث البث المباشر للفيديو أو الأحداث المُرسَلة من الخادم أو WebSockets.
- تتطلّب مصادر السحب منك طلب البيانات منها صراحةً بعد الربط بها. وتشمل الأمثلة
عمليات HTTP من خلال طلبات
fetch()
أوXMLHttpRequest
.
تتم قراءة بيانات البث بشكل تسلسلي في أجزاء صغيرة تُعرف باسم المقاطع. يُقال إنّ الأجزاء التي يتم وضعها في بث مُدرَجة في قائمة الانتظار. وهذا يعني أنّه في انتظار المراجعة. تتتبّع القائمة الداخلية الأجزاء التي لم تتم قراءتها بعد.
استراتيجية وضع المحتوى في قائمة الانتظار هي عنصر يحدّد كيفية إرسال بث إشارة الضغط الخلفي استنادًا إلى حالة قائمة الانتظار الداخلية. تحدّد استراتيجية وضع المحتوى في "قائمة المحتوى التالي" حجمًا لكل جزء، وتقارن بين الحجم الإجمالي لجميع الأجزاء في "قائمة المحتوى التالي" وعدد محدّد يُعرف باسم الحد الأقصى المسموح به.
يقرأ قارئ الأجزاء داخل البث. يسترجع هذا القارئ البيانات في قطعة واحدة في كل مرة، ما يتيح لك إجراء أي نوع من العمليات التي تريد إجراؤها عليها. يُطلق على القارئ بالإضافة إلى رمز المعالجة الآخر المصاحب له اسم مستهلك.
يُطلق على البنية التالية في هذا السياق اسم عنصر تحكّم. يحتوي كل مصدر بيانات قابل للقراءة على عنصر تحكّم مرتبط به، كما يوحي الاسم، يتيح لك التحكّم في مصدر البيانات.
يمكن لقارئ واحد فقط قراءة مصدر بيانات في كل مرة. عند إنشاء قارئ وبدء قراءة مصدر بيانات، (أي يصبح قارئًا نشطًا)، يتم قفله عليه. إذا أردت أن يتولى قارئ آخر قراءة البث، عليك عادةً إلغاء قراءة القارئ الأول قبل تنفيذ أي إجراء آخر (على الرغم من أنّه يمكنك تقسيم أحداث البث).
إنشاء مصدر بيانات قابل للقراءة
يمكنك إنشاء مصدر بيانات قابل للقراءة من خلال استدعاء عنصر الإنشاء
ReadableStream()
.
يحتوي المُنشئ على وسيطة اختيارية underlyingSource
، والتي تمثّل كائنًا
يتضمّن طُرقًا وسمات تحدّد سلوك مثيل البث الذي تم إنشاؤه.
فريق underlyingSource
ويمكن أن يستخدِم هذا الإجراء الطرق الاختيارية التالية التي يحدّدها المطوّر:
-
start(controller)
: يتمّ استدعاؤه على الفور عند إنشاء العنصر. يمكن للطريقة الوصول إلى مصدر البث وتنفيذ أي إجراء آخر مطلوب لإعداد وظيفة البث. إذا كان من المفترض تنفيذ هذه العملية بشكل غير متزامن، يمكن للطريقة عرض وعد للإشارة إلى النجاح أو الفشل. المعلَمةcontroller
التي تم تمريرها إلى هذه الطريقة هي aReadableStreamDefaultController
. pull(controller)
: يمكن استخدامها للتحكّم في البث أثناء جلب المزيد من الأجزاء. ويتم استدعاؤه مراراً وتكراراً ما دامت قائمة الانتظار الداخلية للبث غير ممتلئة، إلى أن تصل قائمة الانتظار إلى الحد الأقصى. إذا كانت نتيجة استدعاءpull()
هي وعد، لن يتم استدعاءpull()
مرة أخرى إلى أن يتم الوفاء بهذا الوعد. إذا تم رفض الوعد، سيظهر خطأ في البث.-
cancel(reason)
: يتمّ استدعاؤه عندما يلغي مستخدِم البث البث.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
تتيح ReadableStreamDefaultController
الطرق التالية:
ReadableStreamDefaultController.close()
يؤدي إلى إغلاق البث المرتبط.ReadableStreamDefaultController.enqueue()
تضيف هذه الدالة قطعة معيّنة إلى "قائمة الانتظار" في البث المرتبط.- يؤدي
ReadableStreamDefaultController.error()
إلى حدوث خطأ في أي تفاعلات مستقبلية مع البث المرتبط.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
فريق queuingStrategy
الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء ReadableStream()
هي queuingStrategy
.
وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة المحتوى التالي" للبث، ويأخذ مَعلمتَين:
highWaterMark
: رقم غير سالب يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه.size(chunk)
: دالة تحسب الحجم المحدّد غير السالب لقيمة الجزء المحدّدة وتعرضه. وتُستخدَم النتيجة لتحديد الضغط الخلفي الذي يظهر من خلال السمةReadableStreamDefaultController.desiredSize
المناسبة. ويحدِّد أيضًا حالات استدعاء طريقةpull()
للمصدر الأساسي.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
الطريقتان getReader()
وread()
للقراءة من مصدر قابل للقراءة، تحتاج إلى قارئ، وهو ReadableStreamDefaultReader
.
تنشئ طريقة getReader()
لواجهة ReadableStream
قارئًا وتقفِل البث عليه. عندما يكون البث مقفلًا، لا يمكن الحصول على قارئ آخر إلى أن يتم تحرير هذا القارئ.
تُعرِض طريقة read()
لواجهة ReadableStreamDefaultReader
وعدًا يسمح بالوصول إلى القطعة التالية
في قائمة الانتظار الداخلية للبث. يتم تنفيذ الطلب أو رفضه بنتيجة استنادًا إلى حالة
البث. في ما يلي الاحتمالات المختلفة:
- إذا كان هناك قطعة متاحة، سيتم تنفيذ الوعد بعنصر من الشكل
{ value: chunk, done: false }
. - إذا أصبح مصدر البيانات مغلقًا، سيتم تنفيذ الوعد باستخدام عنصر من النوع
{ value: undefined, done: true }
. - إذا حدث خطأ في البث، سيتم رفض الوعد مع تضمين الخطأ ذي الصلة.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
سمة locked
يمكنك التحقّق مما إذا كان مصدر بيانات قابل للقراءة مُقفَلًا من خلال الانتقال إلى
ReadableStream.locked
موقعه.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
عيّنات رمز بث سهل القراءة
يوضّح نموذج الرمز البرمجي أدناه جميع الخطوات أثناء تنفيذها. عليك أولاً إنشاء ReadableStream
تحدِّد في underlyingSource
(أي فئة TimestampSource
) طريقة start()
.
تُطلب من خلال هذه الطريقة من controller
البث
enqueue()
طابعًا زمنيًا كل ثانية لمدة عشر ثوانٍ.
أخيرًا، يطلب من وحدة التحكّم close()
البث. يمكنك استخدام هذا
البث من خلال إنشاء قارئ باستخدام الطريقة getReader()
واستدعاء read()
إلى أن يصبح البث
done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
التكرار غير المتزامن
قد لا تكون واجهة برمجة التطبيقات الأكثر ملاءمةً هي التحقّق من كل تكرار read()
للحلقة إذا كان البث done
.
لحسن الحظ، ستتوفّر قريبًا طريقة أفضل لإجراء ذلك: التكرار غير المتزامن.
for await (const chunk of stream) {
console.log(chunk);
}
إنّ أحد الحلول البديلة لاستخدام التكرار غير المتزامن اليوم هو تنفيذ السلوك باستخدام polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
إنشاء بث قابل للقراءة
تُنشئ طريقة tee()
لواجهة
ReadableStream
البث القابل للقراءة الحالي، وتُعرِض صفيفًا من عنصرَين
يحتوي على الفرعين الناتجَين كمثيلَين جديدَين من ReadableStream
. يتيح ذلك لقراءين قراءة بث في الوقت نفسه. يمكنك إجراء ذلك، على سبيل المثال، في مشغّل خدمة إذا
كنت تريد جلب استجابة من الخادم وبثّها إلى المتصفّح، ولكن أيضًا بثّها إلى
التخزين المؤقت لمشغّل الخدمة. بما أنّه لا يمكن استخدام نص الردّ أكثر من مرّة، تحتاج إلى نسختَين للقيام بذلك. لإلغاء البث، عليك إلغاء كلا الفرعين الناتجَين. سيؤدي بدء بث محتوى
إلى قفله بشكل عام طوال المدة، ما يمنع القرّاء الآخرين من قفله.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
تدفّقات وحدات البايت القابلة للقراءة
بالنسبة إلى أحداث البث التي تمثّل وحدات البايت، يتوفّر إصدار موسّع من البث القابل للقراءة لمعالجة وحدات البايت بكفاءة، لا سيما من خلال تقليل عدد النُسخ. تتيح أحداث "بث المحتوى" اكتساب قراء "جلب المحتوى الخاص بك" (BYOB). يمكن أن يقدّم التنفيذ التلقائي مجموعة من المخرجات المختلفة، مثل السلاسل أو مخازن مصفوفات في حالة WebSockets، في حين تضمن تدفّقات البايتات إخراج البايتات. بالإضافة إلى ذلك، يستفيد قراء BYOB من مزايا الثبات. ويعود سبب ذلك إلى أنّه في حال فصل المخزن المؤقت، يمكن ضمان عدم الكتابة في المخزن المؤقت نفسه مرّتين، وبالتالي تجنُّب حالات السباق. يمكن لقراء BYOB تقليل عدد المرات التي يحتاج فيها المتصفّح إلى تنفيذ عملية جمع القمامة، لأنّه يمكنه إعادة استخدام ذاكرة التخزين المؤقت.
إنشاء بث وحدات بت قابلة للقراءة
يمكنك إنشاء بث بايتات قابل للقراءة عن طريق تمرير مَعلمة type
إضافية إلى الدالة الإنشائية
ReadableStream()
.
new ReadableStream({ type: 'bytes' });
فريق underlyingSource
يتم منح المصدر الأساسي لبث وحدات البايت القابلة للقراءة ReadableByteStreamController
للتلاعب به. تأخذ الطريقة ReadableByteStreamController.enqueue()
وسيطة chunk
تكون قيمتها
ArrayBufferView
. تعرض السمة ReadableByteStreamController.byobRequest
طلب سحب
BYOB الحالي، أو قيمة فارغة إذا لم يكن هناك طلب. أخيرًا، يعرض الحقل ReadableByteStreamController.desiredSize
الحجم المطلوب لملء "القائمة الداخلية" للبث الخاضع للتحكّم.
فريق queuingStrategy
الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء ReadableStream()
هي queuingStrategy
.
وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة الانتظار" للبث، والتي تأخذ مَعلمة واحدة:
-
highWaterMark
: عدد غير سالب من وحدات البايت يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه. يُستخدَم ذلك لتحديد الضغط الخلفي الذي يظهر من خلال السمةReadableByteStreamController.desiredSize
المناسبة. ويحدِّد أيضًا حالات استدعاء طريقةpull()
للمصدر الأساسي.
الطريقتان getReader()
وread()
يمكنك بعد ذلك الوصول إلى ReadableStreamBYOBReader
من خلال ضبط المَعلمة mode
وفقًا لذلك:
ReadableStream.getReader({ mode: "byob" })
. يتيح ذلك التحكّم بشكل أكثر دقة في تخصيص ملف التخزين المؤقت
لتجنُّب النُسخ. للقراءة من بث البايتات، عليك استدعاء
ReadableStreamBYOBReader.read(view)
، حيث يكون view
هو
ArrayBufferView
.
نموذج تعليمات برمجية لبث وحدات البايت القابلة للقراءة
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
تعرض الدالة التالية مصادر بيانات قابلة للقراءة تتيح قراءة صفيف تم إنشاؤه عشوائيًا بكفاءة بدون نسخ البيانات. بدلاً من استخدام حجم قطعة محدّد مسبقًا يبلغ 1,024، يحاول ملء المخازن المؤقتة التي يقدّمها المطوّر، ما يتيح التحكّم الكامل.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
آليات بث قابل للكتابة
البث القابل للكتابة هو وجهة يمكنك كتابة البيانات فيها، ويتم تمثيله في JavaScript بكائن
WritableStream
. ويعمل هذا العنصر
كعنصر تجريد على مستوى أعلى من وحدة معالجة بيانات أساسية، وهي وحدة معالجة بيانات I/O من المستوى الأدنى يتم فيها
كتابة البيانات الأولية.
يتم كتابة البيانات في البث من خلال كاتب، كل جزء في المرة الواحدة. يمكن أن تتّخذ الشريحة العديد من الأشكال، تمامًا مثل الشرائح في قارئ. يمكنك استخدام أي رمز تريده لإنشاء المقاطع الجاهزة للكتابة. ويُطلق على الكاتب والرمز المرتبط به اسم منتج.
عند إنشاء كاتب وبدء الكتابة في بث (كاتب نشط)، يُقال أنّه مُقفَل عليه. يمكن لكاتب واحد فقط الكتابة في بث قابل للكتابة في المرة الواحدة. إذا أردت أن يبدأ كاتب آخر في كتابة محتوى في البث، عليك عادةً تحريره قبل ربط كاتب آخر به.
تتتبّع القائمة الداخلية الأجزاء التي تمّت كتابتها إلى البث ولكن لم تتم معالجتها بعد من خلال وحدة الاستقبال الأساسية.
استراتيجية وضع المحتوى في قائمة الانتظار هي عنصر يحدّد كيفية إرسال بث إشارة الضغط الخلفي استنادًا إلى حالة قائمة الانتظار الداخلية. تحدّد استراتيجية وضع المحتوى في "قائمة المحتوى التالي" حجمًا لكل جزء، وتقارن بين الحجم الإجمالي لجميع الأجزاء في "قائمة المحتوى التالي" وعدد محدّد يُعرف باسم الحد الأقصى المسموح به.
يُطلق على البنية النهائية اسم عنصر تحكّم. يحتوي كل مصدر بيانات قابل للكتابة على وحدة تحكّم مرتبطة تسمح لك بالتحكم في مصدر البيانات (على سبيل المثال، لإيقافه).
إنشاء بث قابل للكتابة
توفّر واجهة WritableStream
في
واجهة برمجة التطبيقات Streams API نموذجًا معياريًا لتدوين بيانات البث إلى وجهة، تُعرف
باسم "الوجهة". يتضمّن هذا العنصر ميزة الضغط الخلفي وميزة وضع المحتوى في قائمة الانتظار. يمكنك إنشاء بث قابل للكتابة من خلال
استدعاء عنصر الإنشاء
WritableStream()
.
تتضمّن هذه الطريقة مَعلمة underlyingSink
اختيارية تمثّل عنصرًا
يتضمّن طُرقًا وسمات تحدّد سلوك مثيل البث الذي تم إنشاؤه.
فريق underlyingSink
يمكن أن يتضمّن underlyingSink
الطرق الاختيارية التالية التي يحدّدها المطوّر. المَعلمة controller
المُرسَلة إلى بعض الطرق هي
WritableStreamDefaultController
.
start(controller)
: يتمّ استدعاء هذه الطريقة على الفور عند إنشاء الكائن. يجب أن تهدف محتويات هذه الطريقة إلى الوصول إلى الحوض الأساسي. إذا كان من المفترض أن تتم هذه العملية بشكل غير متزامن، يمكن أن تُرجع وعدًا للإشارة إلى النجاح أو الفشل.write(chunk, controller)
: سيتمّ استدعاء هذه الطريقة عندما تكون مجموعة جديدة من البيانات (المحدّدة في المَعلمةchunk
) جاهزة للكتابة في المصرف الأساسي. ويمكن أن يعرض وعدًا لتحديد نجاح عملية الكتابة أو تعذّرها. لن يتمّ استدعاء هذه الطريقة إلّا بعد نجاح عمليات الكتابة السابقة، ولن يتمّ استدعاؤها مطلقًا بعد إغلاق البث أو إيقافه.close(controller)
: سيتمّ استدعاء هذه الطريقة إذا أرسَل التطبيق إشارة بأنّه قد انتهى من كتابة المقاطع إلى البث. يجب أن تُجري العناصر ما يلزم لإنهاء عمليات الكتابة إلى الوحدة الأساسية لمعالجة البيانات، وإلغاء إمكانية الوصول إليها. إذا كانت هذه العملية غير متزامنة، يمكنها عرض وعد لإشارة إلى النجاح أو الفشل. لن يتمّ استدعاء هذه الطريقة إلا بعد نجاح كل عمليات الكتابة التي تمّ وضعها في "قائمة الانتظار".abort(reason)
: سيتمّ استدعاء هذه الطريقة إذا أراد التطبيق إغلاق البث فجأة ووضعه في حالة خطأ. ويمكنه تنظيف أي موارد محجوزة، تمامًا مثلclose()
، ولكن سيتم استدعاءabort()
حتى إذا تم وضع عمليات الكتابة في قائمة الانتظار. وسيتم تجاهل هذه الأجزاء. إذا كانت هذه العملية غير متزامنة، يمكنها عرض وعد للإشارة إلى النجاح أو الفشل. تحتوي المَعلمةreason
علىDOMString
تصف سبب إيقاف البث.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
تمثل واجهة برمجة التطبيقات
WritableStreamDefaultController
Streams API وحدة تحكّم تتيح التحكّم في حالة WritableStream
أثناء الإعداد، أو عند إرسال المزيد من الأجزاء للكتابة، أو في نهاية الكتابة. عند إنشاء
WritableStream
، يتم منح الحوض الأساسي مثيلًا WritableStreamDefaultController
مقابلًا للتلاعب به. يحتوي WritableStreamDefaultController
على طريقة واحدة فقط:
WritableStreamDefaultController.error()
،
مما يؤدي إلى حدوث خطأ في أي تفاعلات مستقبلية مع البث المرتبط.
تتيح WritableStreamDefaultController
أيضًا استخدام السمة signal
التي تعرض مثيلًا من
AbortSignal
،
مما يسمح بإيقاف عملية WritableStream
إذا لزم الأمر.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
فريق queuingStrategy
الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء WritableStream()
هي queuingStrategy
.
وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة المحتوى التالي" للبث، ويأخذ مَعلمتَين:
highWaterMark
: رقم غير سالب يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه.size(chunk)
: دالة تحسب الحجم المحدّد غير السالب لقيمة الجزء المحدّدة وتعرضه. وتُستخدَم النتيجة لتحديد الضغط الخلفي الذي يظهر من خلال السمةWritableStreamDefaultWriter.desiredSize
المناسبة.
الطريقتان getWriter()
وwrite()
للكتابة في بث قابل للكتابة، تحتاج إلى كاتب، وسيكون WritableStreamDefaultWriter
. تُعرِض طريقة getWriter()
لواجهة WritableStream
مثيلًا جديدًا من WritableStreamDefaultWriter
وتُقفِل البث على هذا المثيل. عندما يكون تدفق الرسائل
مقفَلاً، لا يمكن الحصول على كاتب آخر إلى أن يتم تحرير الكاتب الحالي.
تُسجِّل طريقة write()
واجهة
WritableStreamDefaultWriter
مجموعة بيانات تم تمريرها في WritableStream
ووحدة المعالجة الأساسية، ثم تُعيد
وعدًا يتم حلّه للإشارة إلى نجاح عملية الكتابة أو فشلها. يُرجى العِلم أنّ معنى "النجاح" يعتمد على وحدة النقل الأساسية، فقد يشير ذلك إلى قبول المقطع، وليس بالضرورة أن يكون قد تم حفظه بأمان في وجهته النهائية.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
سمة locked
يمكنك التحقّق مما إذا كان مصدر بيانات قابل للكتابة قد تم قفله من خلال الوصول إلى
WritableStream.locked
موقعه.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
نموذج رمز بث Writable
يعرض نموذج الرمز البرمجي أدناه جميع الخطوات أثناء تنفيذها.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
توجيه بث قابل للقراءة إلى بث قابل للكتابة
يمكن توجيه بث قابل للقراءة إلى بث قابل للكتابة من خلال أسلوب
pipeTo()
للبث القابل للقراءة.
تُوجِّه ReadableStream.pipeTo()
القيمة الحالية ReadableStream
إلى WritableStream
معيّنة وتُعرِض وعدًا يتم تنفيذه عند اكتمال عملية توجيه البيانات بنجاح، أو يتم رفضه في حال حدثت أي أخطاء.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
إنشاء بث تحويل
تمثّل واجهة TransformStream
في Streams API مجموعة من البيانات القابلة للتحويل. يمكنك
إنشاء بث تحويل من خلال استدعاء الدالة الإنشائية TransformStream()
التي تنشئ كائن بث تحويل وتُرجعه
من معالِجات معيّنة. تقبل دالة الإنشاء TransformStream()
كوسيطة أولى عنصر JavaScript اختياريًا يمثّل transformer
. يمكن أن تحتوي هذه العناصر
على أيٍّ من الطرق التالية:
فريق transformer
start(controller)
: يتمّ استدعاء هذه الطريقة على الفور عند إنشاء الكائن. ويُستخدَم عادةً هذا الإجراء لإضافة أجزاء البادئة إلى "قائمة الانتظار" باستخدامcontroller.enqueue()
. وستتم قراءة هذه الأجزاء من الجانب القابل للقراءة، ولكن لا تعتمد على أي عمليات كتابة في الجانب القابل للكتابة. إذا كانت هذه العملية التمهيدية غير متزامنة، على سبيل المثال لأنّ الحصول على أجزاء البادئة يتطلّب بعض الجهد، يمكن للدالة عرض وعد للإشارة إلى النجاح أو الفشل. سيؤدي الوعد المرفوض إلى حدوث خطأ في السلسلة. ستعيد بنيةTransformStream()
طرح أي استثناءات يتم طرحها.transform(chunk, controller)
: يتمّ استدعاء هذه الطريقة عندما تكون مجموعة جديدة مكتوبة في الأساس على الجانب القابل للكتابة جاهزة للتحويل. يضمن تنفيذ البث عدم استدعاء هذه الدالة إلا بعد نجاح عمليات التحويل السابقة، ولن يتم استدعاؤها أبدًا قبل اكتمالstart()
أو بعد استدعاءflush()
. تُنفِّذ هذه الدالة عملية التحويل الفعلية لبث التحويل. ويمكنه إضافة النتائج إلى "قائمة الانتظار" باستخدامcontroller.enqueue()
. يسمح هذا الإجراء بكتابة قطعة واحدة في الجانب القابل للكتابة، ما يؤدي إلى عدم ظهور أيّ قطعة أو ظهور عدة قطع في الجانب القابل للقراءة، وذلك استنادًا إلى عدد مرّات استدعاءcontroller.enqueue()
. إذا كانت عملية التحويل غير متزامنة، يمكن أن تعرِض هذه الدالة وعدًا للإشارة إلى نجاح عملية التحويل أو فشلها. سيؤدي الوعد المرفوض إلى حدوث خطأ في كل من الجانبَين القابلَين للقراءة والكتابة من مجرى التحويل. في حال عدم تقديم طريقةtransform()
، يتم استخدام عملية تحويل الهوية التي تضيف الشرائح إلى "قائمة الانتظار" بدون تغيير من الجانب القابل للكتابة إلى الجانب القابل للقراءة.flush(controller)
: يتمّ استدعاء هذه الطريقة بعد أن يتم تحويل كلّ الأجزاء التي تمّ كتابتها على الجانب القابل للكتابة من خلال تمريرها بنجاح من خلالtransform()
، ويكون الجانب القابل للكتابة على وشك الإغلاق. ويُستخدَم هذا الإجراء عادةً لإدراج أجزاء اللاحقة في "قائمة الانتظار" على الجانب القابل للقراءة، قبل أن يصبح هذا الجانب أيضًا مغلقًا. إذا كانت عملية الفلاش غير متزامنة، يمكن للدالة عرض وعد لتحديد ما إذا كان الفلاش ناجحًا أو تعذّر إكماله، وسيتم إبلاغ المتصل بالدالةstream.writable.write()
بالنتيجة. بالإضافة إلى ذلك، سيؤدي الوعد المرفوض إلى حدوث خطأ في كلّ من الجانبَين القابلَين للقراءة والكتابة في البث. يتم التعامل مع طرح استثناء بالطريقة نفسها التي يتم بها عرض وعد مُرفوض.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
استراتيجيات وضع المحتوى في "قائمة المحتوى التالي" في writableStrategy
وreadableStrategy
المَعلمتان الثانيتان والثالثتان الاختياريتان لصانع TransformStream()
هما استراتيجيةَا الانتظار
writableStrategy
وreadableStrategy
. ويتم تحديدها على النحو الموضّح في القسمين
readable وwritable stream
على التوالي.
نموذج رمز بث التحويل
يعرض نموذج الرمز البرمجي التالي بثًا بسيطًا للتحويل.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
توجيه بث قابل للقراءة من خلال بث تحويل
توفّر طريقة pipeThrough()
لواجهة ReadableStream
طريقة قابلة للربط لتوجيه البث الحالي
من خلال بث تحويل أو أي زوج آخر قابل للكتابة/القراءة. سيؤدي توجيه بث إلى قفله بشكل عام
طوال مدة التوجيه، ما يمنع القرّاء الآخرين من قفله.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
يوضّح نموذج الرمز البرمجي التالي (المُعدّ بشكل مصطنع بعض الشيء) كيفية تنفيذ إصدار "صاخب" من fetch()
يحوّل النص إلى نص كبير باستخدام وعد الاستجابة المعروض
كبث
وتحويل كل جزء إلى نص كبير. وتتمثل ميزة هذا النهج في أنّك لست بحاجة إلى الانتظار إلى أن يتم تنزيل
المستند بأكمله، ما يمكن أن يحدث فرقًا كبيرًا عند التعامل مع الملفات الكبيرة.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
عرض توضيحي
يعرض المقطع التجريبي أدناه أحداث البث القابلة للقراءة والكتابة والتحويل أثناء تنفيذها. يتضمّن أيضًا أمثلة
على سلاسل الأنابيب pipeThrough()
وpipeTo()
، ويوضّح أيضًا tee()
. يمكنك اختياريًا تشغيل
الإصدار التجريبي في نافذته الخاصة أو عرض
رمز المصدر.
أحداث بث مفيدة متاحة في المتصفّح
هناك عدد من مصادر البيانات المفيدة المضمّنة في المتصفّح مباشرةً. يمكنك بسهولة إنشاء
ReadableStream
من قطعة بيانات. تُرجع طريقة stream() في واجهة Blob
ReadableStream
الذي يعرض عند القراءة البيانات المضمّنة في العنصر المصغّر. تذكَّر أيضًا أنّ كائن
File
هو نوع معيّن من
Blob
، ويمكن استخدامه في أي سياق يمكن استخدام ملفّ نصي فيه.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
يُطلق على الصيغ المخصّصة للبث من TextDecoder.decode()
وTextEncoder.encode()
اسم
TextDecoderStream
و
TextEncoderStream
على التوالي.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
يمكنك بسهولة ضغط ملف أو فك ضغطه باستخدام بثَي التحويل
CompressionStream
و
DecompressionStream
على التوالي. يوضّح نموذج الرمز البرمجي أدناه كيفية تنزيل مواصفات Streams وضغطها (باستخدام gzip)
مباشرةً في المتصفّح وكتابة الملف المضغوط على القرص مباشرةً.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
إنّ FileSystemWritableFileStream
وfetch()
تدفّقات الطلبات التجريبية في File System Access API هي مثالان على تدفّقات البيانات القابلة للكتابة في الوقت الفعلي.
تستخدِم Serial API بشكل كبير كلّ من مصادر البيانات القابلة للقراءة والكتابة.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
أخيرًا، تدمج واجهة برمجة تطبيقات WebSocketStream
أحداث البث مع واجهة برمجة تطبيقات WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
مراجع مفيدة
- مواصفات ساحات المشاركات
- العروض التوضيحية المصاحبة
- Streams polyfill
- 2016: عام مصادر البيانات على الويب
- العناصر المتكرّرة والمنشِئين غير المتزامنين
- أداة تحليل أداء البث
الشكر والتقدير
تمت مراجعة هذه المقالة من قِبل Jake Archibald، François Beaufort، Sam Dutton، Mattias Buelens، Surma، Joe Medley، Adam Rice. ساعدتني منشورات المدونة التي نشرها Jake Archibald كثيرًا في فهم البث المباشر. استُوحيت بعض نماذج الرموز البرمجية من استكشافات مستخدم GitHub @bellbind، وتمت برمجة أجزاء من النثر بشكل كبير استنادًا إلى مستندات الويب MDN على Streams. لقد بذل مؤلفو معيار أحداث البث جهدًا رائعًا في كتابة هذه المواصفات. الصورة الرئيسية من إنشاء ريان لارا على Unsplash.