أحداث البث: الدليل النهائي

تعرَّف على كيفية استخدام مصادر البيانات القابلة للقراءة والكتابة والتحويل باستخدام واجهة برمجة التطبيقات Streams API.

تتيح لك Streams API الوصول آليًا إلى مصادر بيانات يتم استلامها عبر الشبكة أو إنشاؤها بأي وسيلة على الجهاز ومعالجتها باستخدام JavaScript. يتضمن البث تقسيم مورد تريد تلقّيه أو إرساله أو تحويله إلى أجزاء صغيرة، ثم معالجة هذه الأجزاء تدريجيًا. على الرغم من أنّ البث المباشر هو ميزة توفّرها المتصفّحات بشكلٍ تلقائي عند تلقّي مواد عرض مثل HTML أو الفيديوهات التي سيتم عرضها على صفحات الويب، لم تكن هذه الميزة متاحة مطلقًا في JavaScript قبل أن يتم طرح fetch مع أحداث البث في عام 2015.

في السابق، إذا كنت تريد معالجة مورد من نوع ما (سواء كان فيديو أو ملفًا نصيًا وما إلى ذلك)، كان عليك تنزيل الملف بأكمله والانتظار حتى يتم إلغاء تسلسله إلى تنسيق مناسب، ثم معالجته. وكل ذلك يتغيّر، مع توفُّر مجموعات البث بلغة JavaScript. يمكنك الآن معالجة البيانات الأولية باستخدام JavaScript تدريجيًا فور توفُّرها للعميل، بدون الحاجة إلى إنشاء مخزن مؤقت أو سلسلة أو كائن ثنائي كبير (blob). ويساعد ذلك في الكشف عن بعض حالات الاستخدام، وأدرجتُ بعضها أدناه:

  • تأثيرات الفيديو: لتوجيه بث فيديو قابل للقراءة من خلال بث تحويل يطبّق التأثيرات في الوقت الفعلي.
  • (فك) ضغط البيانات: يتم توجيه مصدر ملفات من خلال مصدر تحويل (فك) يضغطه ويفك ضغطه بشكل انتقائي.
  • فك ترميز الصور: يتم توجيه تدفق استجابة HTTP من خلال تدفق تحويل يفك ترميز البايتات ويحوّلها إلى بيانات مخطّط بياني، ثم من خلال تدفق تحويل آخر يحوّل المخطّطات البيانية إلى ملفات بتنسيق PNG. إذا تم تثبيته داخل معالج fetch لمشغِّل الخدمات، سيتيح لك ذلك تعويض تنسيقات الصور الجديدة بشفافية، مثل AVIF.

دعم المتصفح

ReadableStream وWritableStream

توافق المتصفّح

  • Chrome: 43.
  • الحافة: 14.
  • Firefox: 65
  • Safari: الإصدار 10.1.

المصدر

TransformStream

دعم المتصفح

  • Chrome: 67
  • ‫Edge: 79
  • Firefox: 102.
  • Safari: الإصدار 14.1.

المصدر

المفاهيم الأساسية

قبل أن أقدّم لك تفاصيل عن الأنواع المختلفة من أحداث البث، سأقدّم لك بعض المفاهيم الأساسية.

أجزاء

القطعة هي جزء واحد من البيانات يتم كتابته في مصدر بيانات أو قراءته منه. يمكن أن يكون من أي نوع، ويمكن أن تحتوي أحداث البث على أجزاء من أنواع مختلفة. في معظم الأحيان، لن تكون المجموعة هي أصدق وحدة بيانات لبث معيّن. على سبيل المثال، قد يحتوي بث البايتات على أجزاء تتألف من 16 وحدة Uint8Arrayكيلوبايت، بدلاً من وحدات بايت فردية.

مصادر البيانات القابلة للقراءة

يمثّل مصدر البيانات القابل للقراءة مصدر بيانات يمكنك القراءة منه. بعبارة أخرى، تأتي البيانات من مصدر بيانات قابل للقراءة. على وجه التحديد، بث قابل للقراءة هو مثيل لفئة ReadableStream .

مصادر البيانات القابلة للكتابة

يمثّل مصدر البيانات القابل للكتابة وجهة للبيانات التي يمكنك الكتابة فيها. بعبارة أخرى، يتم إدخال البيانات في مصدر قابل للكتابة. على وجه التحديد، تيار قابل للكتابة هو مثيل لفئة WritableStream.

تحويل أحداث البث

تتكوّن مجموعة البث التحويلية من مجموعة بث: مصدر بيانات قابل للكتابة، يُعرف باسمه القابل للكتابة، وساحة مشاركات قابلة للقراءة تُعرَف باسم الجانب القابل للقراءة. يمكن تشبيه هذه الميزة بأحد المترجمين الفوريين الذين يترجمون من لغة إلى أخرى أثناء التحدث. بطريقة خاصة بمصدر البيانات المخصّص للتحويل، تؤدّي الكتابة إلى الجانب القابل للكتابة إلى إتاحة بيانات جديدة للقراءة من الجانب القابل للقراءة. في الواقع، يمكن استخدام أي عنصر يحتوي على السمة writable والسمة readable كبث تحويل. ولكن الفئة TransformStream العادية تسهّل إنشاء هذا الزوج المرتبط بشكل صحيح.

سلاسل الأنابيب

يتم استخدام مصادر البيانات بشكل أساسي من خلال توجيهها إلى بعضها. يمكن توجيه بث قابل للقراءة مباشرةً إلى بث قابل للكتابة، وذلك باستخدام الطريقة pipeTo() للبث القابل للقراءة، أو يمكن توجيهه من خلال بث واحد أو أكثر من عمليات التحويل أولاً، وذلك باستخدام الطريقة pipeThrough() للبث القابل للقراءة. ويُطلق على مجموعة من البثّات التي تم ربطها معًا بهذه الطريقة اسم سلسلة البث.

الضغط الخلفي

بعد إنشاء سلسلة قنوات، ستنشر إشارات بشأن سرعة تدفق الأجزاء من خلالها. إذا لم تتمكّن أي خطوة في السلسلة من قبول أجزاء بعد، يتم نشر إشارة للخلف من خلال سلسلة الأنابيب، إلى أن يتم في النهاية إبلاغ المصدر الأصلي بإيقاف إنتاج الأجزاء بهذه السرعة . تُعرف عملية تطبيع التدفق هذه باسم الضغط الخلفي.

تزلُّج

يمكن بدء بث قابل للقراءة (يُعرف باسم "T" الكبير) باستخدام الطريقة tee(). سيؤدي ذلك إلى قفل البث، أي لن يعود قابلاً للاستخدام مباشرةً، ولكن سيؤدي إلى إنشاء سلسلتَي بث جديدتَين، تُعرفان باسم الفروع، ويمكن استخدامهما بشكل مستقل. من المهم أيضًا ضبط البث مسبقًا لأنّه لا يمكن ترجيع البث أو إعادة تشغيله. وسنوضّح لك المزيد من المعلومات حول هذا الموضوع لاحقًا.

مخطّط بياني لسلسلة ممر يتكون من تدفق قابل للقراءة يأتي من طلب إلى واجهة برمجة تطبيقات الجلب ثم يتم توجيهه عبر دفق تحويل يتم ربط مخرجاته ثم يتم إرساله إلى المتصفح لأول بث ناتج قابل للقراءة وذاكرة التخزين المؤقت لعامل الخدمة في الدفق الثاني القابل للقراءة الناتج.
سلسلة ممر.

آليات البث القابل للقراءة

مصدر البيانات القابل للقراءة هو مصدر بيانات يتم تمثيله في JavaScript من خلال عنصر ReadableStream الذي ينبع من مصدر أساسي. تنشئ الدالة الإنشائية ReadableStream() كائن بث قابل للقراءة من معالِجات معيّنة، وتعرضه. هناك نوعان من المصادر الأساسية:

  • تُرسِل مصادر الإرسال البيانات إليك باستمرار عند الوصول إليها، ويكون بإمكانك بدء البث أو إيقافه مؤقتًا أو إلغاء الوصول إليه. وتشمل الأمثلة أحداث البث المباشر للفيديو أو الأحداث المُرسَلة من الخادم أو WebSockets.
  • تتطلّب مصادر السحب منك طلب البيانات منها صراحةً بعد الربط بها. وتشمل الأمثلة عمليات HTTP من خلال طلبات fetch() أو XMLHttpRequest.

تتم قراءة بيانات البث بشكل تسلسلي في أجزاء صغيرة تُعرف باسم المقاطع. ويُشار إلى المقاطع الموضوعة في ساحة المشاركات بإدراجها في قائمة الانتظار. وهذا يعني أنّها في انتظار المعالجة في قائمة الانتظار وجاهزة للقراءة. تتتبّع القائمة الداخلية الأجزاء التي لم تتم قراءتها بعد.

استراتيجية وضع المحتوى في قائمة الانتظار هي عنصر يحدّد كيفية إرسال بث إشارة الضغط الخلفي استنادًا إلى حالة قائمة الانتظار الداخلية. تحدِّد استراتيجية وضع المحتوى في "قائمة المحتوى التالي" حجمًا لكل جزء، وتقارن بين الحجم الإجمالي لجميع الأجزاء في "قائمة المحتوى التالي" وعدد محدّد يُعرف باسم الحد الأقصى المسموح به.

يقرأ قارئ الأجزاء داخل البث. يسترجع هذا القارئ البيانات في قطعة واحدة في كل مرة، ما يتيح لك إجراء أي نوع من العمليات التي تريدها عليها. يُطلق على القارئ بالإضافة إلى رمز المعالجة الآخر المصاحب له اسم مستهلك.

ويُطلق على البنية التالية في هذا السياق اسم وحدة التحكّم. يحتوي كل مصدر بيانات قابل للقراءة على عنصر تحكّم مرتبط به، كما يوحي الاسم، يتيح لك التحكّم في مصدر البيانات.

يمكن لقارئ واحد فقط قراءة مصدر بيانات في كل مرة. عند إنشاء قارئ وبدء قراءة مصدر بيانات، (أي أنّه يصبح قارئًا نشطًا)، يتم قفله عليه. إذا أردت أن يتولى قارئ آخر قراءة ساحة المشاركات، عليك عادةً إطلاق القارئ الأول قبل تنفيذ أي إجراء آخر (مع أنّه يمكنك بدء البث).

إنشاء مصدر بيانات قابل للقراءة

يمكنك إنشاء بث قابل للقراءة من خلال استدعاء الدالة الإنشائية ReadableStream(). يحتوي المُنشئ على وسيطة اختيارية underlyingSource، والتي تمثّل كائنًا يتضمّن طُرقًا وسمات تحدّد سلوك مثيل البث الذي تم إنشاؤه.

فريق underlyingSource

ويمكن أن يستخدِم هذا الإجراء الطرق الاختيارية التالية التي يحدّدها المطوّر:

  • start(controller): يتم استدعاؤه فورًا عند إنشاء العنصر. يمكن للطريقة الوصول إلى مصدر البث وتنفيذ أي إجراء آخر مطلوب لإعداد وظيفة البث. إذا تم إجراء هذه العملية بشكل غير متزامن، يمكن أن توضح الطريقة وعدًا للإشارة إلى النجاح أو الفشل. المعلَمة controller التي تم تمريرها إلى هذه الطريقة هي a ReadableStreamDefaultController.
  • pull(controller): يمكن استخدامها للتحكّم في البث أثناء جلب المزيد من الأجزاء. ويتم استدعاؤها بشكل متكرر طالما أن قائمة الانتظار الداخلية للأجزاء غير كاملة، حتى تصل قائمة الانتظار إلى علامتها المائية المرتفعة. إذا كانت نتيجة استدعاء pull() هي وعد، لن يتم استدعاء pull() مرة أخرى إلى أن يتم الوفاء بهذا الوعد. إذا تم رفض الوعد، سيظهر خطأ في البث.
  • cancel(reason): يتم الاتصال عندما يلغي مستهلك البث البث.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

تتيح ReadableStreamDefaultController الطرق التالية:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء ReadableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة المحتوى التالي" للبث، ويأخذ مَعلمتَين:

  • highWaterMark: رقم غير سالب يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه.
  • size(chunk): دالة تحسب الحجم المحدّد غير السالب لقيمة الجزء المحدّدة وتعرضه. تُستخدَم النتيجة لتحديد الضغط الخلفي الذي يظهر من خلال السمة ReadableStreamDefaultController.desiredSize المناسبة. ويحدِّد أيضًا وقت استدعاء طريقة pull() للمصدر الأساسي.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

الطريقتان getReader() وread()

للقراءة من مصدر قابل للقراءة، تحتاج إلى قارئ، وهو ReadableStreamDefaultReader. تنشئ الطريقة getReader() الخاصة بواجهة ReadableStream قارئًا وتقفل البث عليه. عندما يكون البث مقفلًا، لا يمكن الحصول على قارئ آخر إلى أن يتم تحرير هذا القارئ.

تُعرِض طريقة read() لواجهة ReadableStreamDefaultReader وعدًا يسمح بالوصول إلى القطعة التالية في قائمة الانتظار الداخلية للبث. يتم تنفيذ الطلب أو رفضه بنتيجة استنادًا إلى حالة البث. في ما يلي الاحتمالات المختلفة:

  • إذا كان هناك قطعة متاحة، سيتم تنفيذ الوعد بعنصر من الشكل
    { value: chunk, done: false }.
  • إذا أصبح مصدر البيانات مغلقًا، سيتم تنفيذ الوعد باستخدام عنصر من النوع
    { value: undefined, done: true }.
  • وإذا حدث خطأ في البث، سيتمّ رفض الوعد عرض رسالة الخطأ ذات الصلة.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

سمة locked

يمكنك التحقّق مما إذا كان بث قابل للقراءة قد تم قفله من خلال الانتقال إلى ReadableStream.locked موقعه.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نماذج لرموز بث قابلة للقراءة

يوضّح نموذج الرمز البرمجي أدناه جميع الخطوات أثناء تنفيذها. عليك أولاً إنشاء ReadableStream تحدِّد في underlyingSource (أي فئة TimestampSource) طريقة start(). تُطلب من خلال هذه الطريقة من controller البث enqueue() طابعًا زمنيًا كل ثانية لمدة عشر ثوانٍ. أخيرًا، يطلب من وحدة التحكّم close() البث. يمكنك الاستفادة من هذا البث من خلال إنشاء قارئ باستخدام طريقة getReader() وطلب read() حتى يصبح البث done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

تكرار غير متزامن

قد لا تكون واجهة برمجة التطبيقات الأكثر ملاءمةً هي التحقّق من كل تكرار read() للحلقة إذا كان البث done. لحسن الحظ، ستتوفّر قريبًا طريقة أفضل لإجراء ذلك: التكرار غير المتزامن.

for await (const chunk of stream) {
  console.log(chunk);
}

إنّ أحد الحلول البديلة لاستخدام التكرار غير المتزامن اليوم هو تنفيذ السلوك باستخدام polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

إنشاء بث قابل للقراءة

تُنشئ طريقة tee() لواجهة ReadableStream البث القابل للقراءة الحالي، وتُعرِض صفيفًا من عنصرَين يحتوي على الفرعين الناتجَين كمثيلَين جديدَين من ReadableStream. يتيح ذلك لقراءين قراءة بث في الوقت نفسه. يمكنك إجراء ذلك، على سبيل المثال، في مشغّل خدمة إذا كنت تريد جلب استجابة من الخادم وبثّها إلى المتصفّح، ولكن أيضًا بثّها إلى التخزين المؤقت لمشغّل الخدمة. وبما أنّه لا يمكن استخدام نص الردّ أكثر من مرّة، ستحتاج إلى نسختَين للقيام بذلك. لإلغاء البث، ستحتاج إلى إلغاء كلا فرعيهما الناتجين. سيؤدي بدء بث محتوى إلى قفله بشكل عام طوال المدة المحددة، ما يمنع القرّاء الآخرين من قفله.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

تدفّقات وحدات البايت القابلة للقراءة

بالنسبة إلى أحداث البث التي تمثّل وحدات البايت، يتوفّر إصدار موسّع من البث القابل للقراءة لمعالجة وحدات البايت بكفاءة، لا سيما من خلال تقليل عدد النُسخ. تتيح أحداث "بث المحتوى" استخدام ميزة "جلب المساحة التخزينية الخاصة بك" (BYOB) من أجل اكتساب القرّاء. يمكن أن يقدّم التنفيذ التلقائي مجموعة من المخرجات المختلفة، مثل السلاسل أو مخازن مصفوفات في حال استخدام WebSockets، في حين تضمن تدفّقات البايتات إخراج البايتات. بالإضافة إلى ذلك، يستفيد قراء BYOB من مزايا الثبات. ويعود سبب ذلك إلى أنّه في حال فصل المخزن المؤقت، يمكن ضمان عدم الكتابة في المخزن المؤقت نفسه مرّتين، وبالتالي تجنُّب حالات السباق. يمكن لقراء BYOB تقليل عدد المرات التي يحتاج فيها المتصفّح إلى تنفيذ عملية جمع القمامة، لأنّه يمكنه إعادة استخدام ذاكرة التخزين المؤقت.

إنشاء بث وحدات بت قابلة للقراءة

يمكنك إنشاء بث بايتات قابل للقراءة عن طريق تمرير مَعلمة type إضافية إلى الدالة الإنشائية ReadableStream().

new ReadableStream({ type: 'bytes' });

فريق underlyingSource

يتم منح المصدر الأساسي لبث وحدات البايت القابلة للقراءة ReadableByteStreamController للتلاعب به. تأخذ الطريقة ReadableByteStreamController.enqueue() وسيطة chunk تكون قيمتها ArrayBufferView. تعرض السمة ReadableByteStreamController.byobRequest طلب سحب BYOB الحالي، أو قيمة فارغة إذا لم يكن هناك طلب. أخيرًا، يعرض الحقل ReadableByteStreamController.desiredSize الحجم المطلوب لملء "القائمة الداخلية للبث الخاضع للتحكّم".

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء ReadableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في قائمة الانتظار للبث، والتي تأخذ مَعلمة واحدة:

  • highWaterMark: عدد غير سالب من وحدات البايت يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه. يُستخدَم ذلك لتحديد الضغط الخلفي الذي يظهر من خلال سمة ReadableByteStreamController.desiredSize المناسبة. ويحدِّد أيضًا حالات استدعاء طريقة pull() للمصدر الأساسي.

الطريقتان getReader() وread()

يمكنك بعد ذلك الوصول إلى ReadableStreamBYOBReader من خلال ضبط المَعلمة mode وفقًا لذلك: ReadableStream.getReader({ mode: "byob" }). يتيح ذلك التحكّم بشكل أكثر دقة في تخصيص ملف التخزين المؤقت لتجنُّب النُسخ. للقراءة من بث البايتات، عليك استدعاء ReadableStreamBYOBReader.read(view)، حيث يكون view هو ArrayBufferView.

نموذج تعليمات برمجية لبث وحدات البايت القابلة للقراءة

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

تعرض الدالة التالية مصادر بيانات قابلة للقراءة تتيح قراءة صفيف تم إنشاؤه عشوائيًا بكفاءة بدون نسخ البيانات. بدلاً من استخدام حجم قطعة محدّد مسبقًا يبلغ 1,024، يحاول ملء المخازن المؤقتة التي يقدّمها المطوّر، ما يتيح التحكّم الكامل.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

آليات البث القابل للكتابة

البث القابل للكتابة هو وجهة يمكنك كتابة البيانات فيها، ويتم تمثيله في JavaScript بكائن WritableStream. ويعمل هذا العنصر كعنصر تجريد على مستوى أعلى من وحدة معالجة بيانات أساسية، وهي وحدة معالجة بيانات I/O من المستوى الأدنى يتم فيها كتابة البيانات الأولية.

يتم كتابة البيانات في البث من خلال كاتب، كل جزء في المرة الواحدة. يمكن أن يتخذ المقطع عدة أشكال، تمامًا مثل القطع الموجودة في القارئ. يمكنك استخدام أي رمز تريده لإنشاء المقاطع الجاهزة للكتابة. ويُطلق على الكاتب والرمز المرتبط به اسم منتج.

عند إنشاء كاتب وبدء الكتابة في بث (كاتب نشط)، يُقال أنّه مُقفَل عليه. يمكن لكاتب واحد فقط الكتابة في بث قابل للكتابة في المرة الواحدة. إذا أردت أن يبدأ كاتب آخر في كتابة محتوى في البث، عليك عادةً تحريره قبل إرفاق كاتب آخر به.

تتتبّع قائمة الانتظار الداخلية المقاطع التي تمت كتابتها في ساحة المشاركات ولكن لم تتم معالجتها بعد من خلال الحوض الأساسي.

استراتيجية وضع المحتوى في قائمة الانتظار هي عنصر يحدّد كيفية إرسال بث إلى ضغط خلفي استنادًا إلى حالة قائمة الانتظار الداخلية. تحدّد استراتيجية وضع المحتوى في "قائمة المحتوى التالي" حجمًا لكل جزء، وتقارن بين الحجم الإجمالي لجميع الأجزاء في "قائمة المحتوى التالي" وعدد محدّد يُعرف باسم الحد الأقصى المسموح به.

يُطلق على البنية النهائية اسم عنصر تحكّم. يحتوي كل مصدر بيانات قابل للكتابة على وحدة تحكّم مرتبطة تسمح لك بالتحكم في مصدر البيانات (على سبيل المثال، لإيقافه).

إنشاء بث قابل للكتابة

توفّر واجهة WritableStream في واجهة برمجة التطبيقات Streams API نموذجًا معياريًا لتدوين بيانات البث إلى وجهة، تُعرف باسم "الوجهة". يتضمّن هذا العنصر ميزة الضغط الخلفي وميزة وضع المحتوى في قائمة الانتظار. يمكنك إنشاء بث قابل للكتابة من خلال استدعاء عنصر الإنشاء WritableStream(). تتضمّن هذه الطريقة مَعلمة underlyingSink اختيارية تمثّل عنصرًا يتضمّن طُرقًا وسمات تحدّد سلوك مثيل البث الذي تم إنشاؤه.

فريق underlyingSink

يمكن أن تتضمّن underlyingSink الطرق الاختيارية التالية التي يحدِّدها المطوِّر. المَعلمة controller المُرسَلة إلى بعض الطرق هي WritableStreamDefaultController.

  • start(controller): يتمّ استدعاء هذه الطريقة على الفور عند إنشاء الكائن. يجب أن تهدف محتويات هذه الطريقة إلى الوصول إلى الحوض الأساسي. إذا كانت هذه العملية ستتم بشكل غير متزامن، فقد يكون ذلك وعدًا بالإشارة إلى النجاح أو الفشل.
  • write(chunk, controller): سيتمّ استدعاء هذه الطريقة عندما تكون مجموعة جديدة من البيانات (المحدّدة في المَعلمة chunk) جاهزة للكتابة في وحدة الاستقبال الأساسية. ويمكن أن يعرض وعدًا لتحديد نجاح عملية الكتابة أو تعذّرها. لن يتمّ استدعاء هذه الطريقة إلا بعد نجاح عمليات الكتابة السابقة، ولن يتمّ استدعاؤها مطلقًا بعد إغلاق البث أو إيقافه.
  • close(controller): سيتمّ استدعاء هذه الطريقة إذا أرسَل التطبيق إشارة بأنّه قد انتهى من كتابة المقاطع إلى البث. يجب أن تُجري العناصر ما يلزم لإنهاء عمليات الكتابة إلى الوحدة الأساسية لمعالجة البيانات، وإلغاء إمكانية الوصول إليها. إذا كانت هذه العملية غير متزامنة، يمكنها عرض وعد لإشارة إلى النجاح أو الفشل. لن يتمّ استدعاء هذه الطريقة إلا بعد نجاح كل عمليات الكتابة التي تمّ وضعها في "قائمة الانتظار".
  • abort(reason): سيتمّ استدعاء هذه الطريقة إذا أراد التطبيق إغلاق البث فجأة ووضعه في حالة خطأ. ويمكنه تنظيف أي موارد محجوزة، تمامًا مثل close()، ولكن سيتم استدعاء abort() حتى إذا تم وضع عمليات الكتابة في قائمة الانتظار. وسيتم تجاهل هذه الأجزاء. إذا كانت هذه العملية غير متزامنة، يمكنها عرض وعد للإشارة إلى النجاح أو الفشل. تحتوي المَعلمة reason على DOMString يصف سبب إيقاف البث.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

تمثل واجهة برمجة التطبيقات WritableStreamDefaultController Streams API وحدة تحكّم تتيح التحكّم في حالة WritableStream أثناء الإعداد، أو عند إرسال المزيد من الأجزاء للكتابة، أو في نهاية الكتابة. عند إنشاء WritableStream، يتم منح الحوض الأساسي مثيل WritableStreamDefaultController مقابل للتعامل معه. يحتوي WritableStreamDefaultController على طريقة واحدة فقط: WritableStreamDefaultController.error()، مما يؤدي إلى حدوث خطأ في أي تفاعلات مستقبلية مع البث المرتبط. تتيح WritableStreamDefaultController أيضًا استخدام السمة signal التي تعرض مثيلًا من AbortSignal، مما يتيح إيقاف عملية WritableStream إذا لزم الأمر.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء WritableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة المحتوى التالي" للبث، ويأخذ مَعلمتَين:

  • highWaterMark: رقم غير سالب يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه.
  • size(chunk): دالة تحسب الحجم المحدّد غير السالب لقيمة الجزء المحدّدة وتعرضه. وتُستخدَم النتيجة لتحديد الضغط الخلفي الذي يظهر من خلال السمة WritableStreamDefaultWriter.desiredSize المناسبة.

الطريقتان getWriter() وwrite()

للكتابة في بث قابل للكتابة، تحتاج إلى كاتب، وسيكون WritableStreamDefaultWriter. تُعرِض طريقة getWriter() لواجهة WritableStream مثيلًا جديدًا من WritableStreamDefaultWriter وتُقفِل البث على هذا المثيل. عندما يكون تدفق الرسائل مقفَلاً، لا يمكن الحصول على كاتب آخر إلى أن يتم تحرير الكاتب الحالي.

تُسجِّل طريقة write() واجهة WritableStreamDefaultWriter مجموعة بيانات تم تمريرها في WritableStream ووحدة المعالجة الأساسية، ثم تُعيد وعدًا يتم حلّه للإشارة إلى نجاح عملية الكتابة أو فشلها. يُرجى العِلم أنّ معنى كلمة "النجاح" يعود إلى الحوض الأساسي، ما قد يشير إلى قبول المقطع، وليس بالضرورة حفظه بأمان في وجهته النهائية.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

سمة locked

يمكنك التحقّق مما إذا كان مصدر بيانات قابل للكتابة قد تم قفله من خلال الوصول إلى WritableStream.locked موقعه.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نموذج رمز بث Writable

يعرض نموذج الرمز البرمجي أدناه جميع الخطوات أثناء تنفيذها.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

توجيه بث قابل للقراءة إلى بث قابل للكتابة

يمكن توجيه بث قابل للقراءة إلى بث قابل للكتابة من خلال طريقة pipeTo() للبث القابل للقراءة. تُوجِّه ReadableStream.pipeTo() القيمة الحالية ReadableStreamإلى WritableStream معيّنة وتُعرِض وعدًا يتم تنفيذه عند اكتمال عملية توجيه البيانات بنجاح، أو يتم رفضه في حال حدثت أي أخطاء.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

إنشاء تدفق تحويل

تمثّل واجهة TransformStream في Streams API مجموعة من البيانات القابلة للتحويل. ويمكنك إنشاء بث تحويل عن طريق استدعاء الدالة الإنشائية TransformStream()، التي تنشئ وتُرجع كائن دفق تحويل من المعالِجات المحددة. تقبل دالة الإنشاء TransformStream() كوسيطة أولى عنصر JavaScript اختياريًا يمثّل transformer. يمكن أن تحتوي هذه العناصر على أيٍّ من الطرق التالية:

فريق transformer

  • start(controller): يتم استدعاء هذه الطريقة على الفور عند إنشاء الكائن. ويُستخدَم عادةً هذا الإجراء لإضافة أجزاء البادئة إلى "قائمة الانتظار" باستخدام controller.enqueue(). وستتم قراءة هذه الأجزاء من الجانب القابل للقراءة، ولكن لا تعتمد على أي عمليات كتابة في الجانب القابل للكتابة. إذا كانت هذه العملية الأولية غير متزامنة، مثلاً لأنّ الأمر يتطلب بعض الجهد للحصول على مقاطع البادئة، يمكن أن تعِد الدالة بالإشارة إلى النجاح أو الفشل، ما سيؤدي إلى خطأ في البث غير الوعد به. ستعيد بنية TransformStream() طرح أي استثناءات يتم طرحها.
  • transform(chunk, controller): يتمّ استدعاء هذه الطريقة عندما تكون مجموعة جديدة مكتوبة في الأصل على الجانب القابل للكتابة جاهزة للتحويل. يضمن تنفيذ البث عدم استدعاء هذه الدالة إلا بعد نجاح عمليات التحويل السابقة، ولن يتم استدعاؤها أبدًا قبل اكتمال start() أو بعد استدعاء flush(). تُنفِّذ هذه الدالة عملية التحويل الفعلية لبث التحويل. ويمكنه إضافة النتائج إلى "قائمة الانتظار" باستخدام controller.enqueue(). يسمح هذا بمقطع واحد مكتوب على الجانب القابل للكتابة أن ينتج عنه مقاطع صفرية أو متعددة على الجانب القابل للقراءة، بناءً على عدد مرات استدعاء controller.enqueue(). إذا كانت عملية التحويل غير متزامنة، يمكن أن تعرِض هذه الدالة وعدًا للإشارة إلى نجاح عملية التحويل أو فشلها. سيؤدي الوعد المرفوض إلى حدوث خطأ في كل من الجانبَين القابلَين للقراءة والكتابة من مجرى التحويل. إذا لم يتم توفير طريقة transform()، يتم استخدام تحويل الهوية، والذي يُدرِج المقاطع بدون تغيير من الجانب القابل للكتابة إلى الجانب القابل للقراءة.
  • flush(controller): يتمّ استدعاء هذه الطريقة بعد أن يتم تحويل كلّ الأجزاء التي تمّ كتابتها على الجانب القابل للكتابة من خلال تمريرها بنجاح من خلال transform()، ويكون الجانب القابل للكتابة على وشك الإغلاق. ويُستخدَم هذا الإجراء عادةً لإدراج أجزاء اللاحقة في "قائمة الانتظار" على الجانب القابل للقراءة، قبل أن يصبح هذا الجانب أيضًا مغلقًا. إذا كانت عملية الفلاش غير متزامنة، يمكن للدالة عرض وعد لتحديد ما إذا كان الفلاش ناجحًا أو تعذّر إكماله، وسيتم إبلاغ المتصل بالدالة stream.writable.write() بالنتيجة. بالإضافة إلى ذلك، سيؤدي الوعد المرفوض إلى حدوث خطأ في كلّ من الجانبَين القابلَين للقراءة والكتابة في البث. يتم التعامل مع طرح استثناء بالطريقة نفسها التي يتم بها عرض وعد مُرفوض.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

استراتيجيتا writableStrategy وreadableStrategy لإضافة المحتوى إلى قائمة المحتوى التالي

المَعلمتان الثانيتان والثالثتان الاختياريتان لصانع TransformStream() هما استراتيجيةَا الانتظار writableStrategy وreadableStrategy. ويتم تحديدها على النحو الموضّح في القسمين readable وwritable stream على التوالي.

نموذج رمز بث التحويل

يعرض نموذج الرمز البرمجي التالي تدفق تحويل بسيط قيد التنفيذ.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

توجيه تدفق قابل للقراءة من خلال دفق تحويل

توفّر طريقة pipeThrough() لواجهة ReadableStream طريقة قابلة للربط لتوجيه البث الحالي من خلال بث تحويل أو أي زوج آخر قابل للكتابة/القراءة. سيؤدي توجيه بث إلى قفله بشكل عام طوال مدة التوجيه، ما يمنع القرّاء الآخرين من قفله.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

يوضّح نموذج الرمز البرمجي التالي (المُعدّ بشكل مصطنع بعض الشيء) كيفية تنفيذ إصدار "صاخب" من fetch() يحوّل النص إلى نص كبير باستخدام وعد الاستجابة المعروض كبث وتحويل كل جزء إلى نص كبير. تتمثل ميزة هذا الأسلوب في أنك لا تحتاج إلى الانتظار حتى يتم تنزيل المستند بأكمله، مما قد يحدث فرقًا كبيرًا عند التعامل مع الملفات الكبيرة.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

عرض توضيحي

يتضمّن العرض التوضيحي أدناه أحداثًا قابلة للقراءة وقابلة للكتابة وتحويلها عمليًا. يتضمّن أيضًا أمثلة على سلاسل الأنابيب pipeThrough() وpipeTo()، ويوضّح أيضًا tee(). يمكنك اختياريًا تشغيل العرض التجريبي في نافذته الخاصة أو عرض رمز المصدر.

أحداث بث مفيدة متاحة في المتصفّح

هناك عدد من مصادر البيانات المفيدة المضمّنة في المتصفّح مباشرةً. يمكنك بسهولة إنشاء ReadableStream من كائن ثنائي كبير. تؤدي طريقة stream() الخاصة بواجهة Blob إلى عرض ReadableStream، وعند القراءة، يتم عرض البيانات المضمَّنة في الكائن الثنائي الكبير (blob). تذكَّر أيضًا أنّ كائن File هو نوع معيّن من Blob، ويمكن استخدامه في أي سياق يمكن استخدام ملفّ نصي فيه.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

يُطلق على كل من TextDecoder.decode() وTextEncoder.encode() اسم TextDecoderStream و TextEncoderStream على التوالي.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

يمكنك بسهولة ضغط ملف أو فك ضغطه باستخدام بثَي التحويل CompressionStream و DecompressionStream على التوالي. يوضح نموذج الرمز أدناه كيفية تنزيل مواصفات Streams وضغطها (gzip) مباشرةً في المتصفح وكتابة الملف المضغوط على القرص مباشرةً.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

إنّ FileSystemWritableFileStream FileSystemWritableFileStream ومصادر طلبات fetch() التجريبية هي أمثلة على عمليات البث القابلة للكتابة على الإنترنت غير القابلة للكتابة عليها.

تستخدم Serial API عمليات البث القابلة للقراءة والقابلة للكتابة بشكل مكثّف.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

أخيرًا، تعمل واجهة برمجة التطبيقات WebSocketStream على دمج مصادر البيانات مع WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

مراجع مفيدة

شكر وتقدير

تمت مراجعة هذه المقالة بواسطة جاك أرشيبالد وفرانسوا بوفورت وسام دوتون وماتياس بولينز وسورما وجو ميدلي وآدم رايس. ساعدتني مشاركات المدونة التي نشرها Jake Archibald كثيرًا في فهم البث المباشر. استُوحيت بعض نماذج الرموز البرمجية من استكشافات مستخدم GitHub @bellbind، وتمت برمجة أجزاء من النثر بشكل كبير استنادًا إلى مستندات الويب على MDN في Streams. لقد بذل مؤلفو معيار أحداث البث جهدًا رائعًا في كتابة هذه المواصفات. الصورة الرئيسية من إنشاء ريان لارا على Unsplash.