أحداث البث: الدليل النهائي

تعرَّف على كيفية استخدام مصادر البيانات القابلة للقراءة والكتابة والتحويل باستخدام واجهة برمجة التطبيقات Streams API.

تتيح لك واجهة برمجة التطبيقات Streams API الوصول آليًا إلى مصادر البيانات التي يتم تلقّيها عبر الشبكة أو التي يتم إنشاؤها بأي وسيلة على الجهاز، ثم معالجتها باستخدام JavaScript. يتضمن البث تقسيم مورد تريد تلقّيه أو إرساله أو تحويله إلى أجزاء صغيرة، ثم معالجة هذه الأجزاء تدريجيًا. على الرغم من أنّ البث المباشر هو ميزة توفّرها المتصفّحات بشكلٍ تلقائي عند تلقّي مواد عرض مثل HTML أو الفيديوهات لعرضها على صفحات الويب، لم تكن هذه الميزة متاحة مطلقًا في JavaScript قبل أن يتم طرح fetch مع أحداث البث في عام 2015.

في السابق، إذا أردت معالجة مورد من نوع ما (سواء كان فيديو أو ملف نصي أو غير ذلك)، كان عليك تنزيل الملف بأكمله والانتظار إلى أن يتم تحويله إلى تنسيق مناسب، ثم معالجته. مع توفّر أحداث البث لسمة JavaScript، سيتغيّر كل ذلك. يمكنك الآن معالجة البيانات الأولية باستخدام JavaScript بشكل تدريجي بمجرد توفّرها على العميل، بدون الحاجة إلى إنشاء ذاكرة تخزين مؤقت أو سلسلة أو ملف نصي. يتيح ذلك عددًا من حالات الاستخدام، ونذكر بعضها أدناه:

  • تأثيرات الفيديو: يتم توجيه بث فيديو قابل للقراءة من خلال بث تحويل يطبّق تأثيرات في الوقت الفعلي.
  • (فك) ضغط البيانات: يتم توجيه مصدر ملفات من خلال مصدر تحويل (فك) يضغطه ويفك ضغطه بشكل انتقائي.
  • فك ترميز الصور: يتم توجيه تدفق استجابة HTTP من خلال تدفق تحويل يفك ترميز البايتات ويحوّلها إلى بيانات مخطّط بياني، ثم من خلال تدفق تحويل آخر يحوّل المخطّطات البيانية إلى ملفات بتنسيق PNG. في حال التثبيت داخل معالِج fetch لأحد موظّفي الخدمة، يتيح لك ذلك استخدام polyfill بشكل شفاف لتنسيقات الصور الجديدة، مثل AVIF.

دعم المتصفح

ReadableStream وWritableStream

توافق المتصفّح

  • Chrome: 43.
  • ‫Edge: 14
  • Firefox: 65
  • ‫Safari: 10.1

المصدر

TransformStream

توافق المتصفّح

  • Chrome: 67
  • ‫Edge: 79
  • Firefox: 102
  • Safari: 14.1

المصدر

المفاهيم الأساسية

قبل أن أقدّم لك تفاصيل عن الأنواع المختلفة من أحداث البث، سأقدّم لك بعض المفاهيم الأساسية.

أجزاء

القطعة هي جزء واحد من البيانات يتم كتابته في مصدر بيانات أو قراءته منه. يمكن أن يكون من أي نوع، ويمكن أن تحتوي أحداث البث على أجزاء من أنواع مختلفة. في معظم الأحيان، لن تكون المجموعة هي أصدق وحدة بيانات لبث معيّن. على سبيل المثال، قد يحتوي بث البايتات على أجزاء تتألف من 16 وحدة Uint8Arrayكيلوبايت، بدلاً من وحدات بايت فردية.

مصادر البيانات القابلة للقراءة

يمثّل مصدر البيانات القابل للقراءة مصدر بيانات يمكنك القراءة منه. بعبارة أخرى، تأتي البيانات من مصدر بيانات قابل للقراءة. على وجه التحديد، بث قابل للقراءة هو مثيل لفئة ReadableStream.

مصادر البيانات القابلة للكتابة

يمثّل مصدر البيانات القابل للكتابة وجهة للبيانات التي يمكنك الكتابة فيها. بعبارة أخرى، يتم إدخال البيانات في مصدر قابل للكتابة. على وجه التحديد، تيار قابل للكتابة هو مثيل لفئة WritableStream.

تحويل أحداث البث

يتكوّن مسار التحويل من زوج من المسارات: مسار قابل للكتابة، يُعرف باسم الجانب القابل للكتابة، ومسار قابل للقراءة، يُعرف باسم الجانب القابل للقراءة. يمكن تشبيه هذه الميزة بأحد المترجمين الفوريين الذين يترجمون من لغة إلى أخرى أثناء التحدث. بطريقة خاصة بمصدر البيانات المخصّص للتحويل، تؤدّي الكتابة إلى الجانب القابل للكتابة إلى إتاحة بيانات جديدة للقراءة من الجانب القابل للقراءة. على وجه التحديد، يمكن لأي عنصر يتضمّن سمة writable وسمة readable أن يُستخدَم كمجموعة بث تحويل. ومع ذلك، تسهِّل فئة TransformStream العادية إنشاء مثل هذا الزوج المتشابك بشكل صحيح.

سلاسل الأنابيب

يتم استخدام مصادر البيانات بشكل أساسي من خلال توجيهها إلى بعضها. يمكن توجيه بث قابل للقراءة مباشرةً إلى بث قابل للكتابة، وذلك باستخدام الطريقة pipeTo() للبث القابل للقراءة، أو يمكن توجيهه من خلال بث واحد أو أكثر من عمليات التحويل أولاً، وذلك باستخدام الطريقة pipeThrough() للبث القابل للقراءة. ويُطلق على مجموعة من البثّات التي تم ربطها معًا بهذه الطريقة اسم سلسلة البث.

الضغط الخلفي

بعد إنشاء سلسلة قنوات، ستنشر إشارات بشأن سرعة تدفق الأجزاء من خلالها. إذا لم تتمكّن أي خطوة في السلسلة من قبول أجزاء بعد، يتم نشر إشارة للخلف من خلال سلسلة الأنابيب، إلى أن يتم في النهاية إبلاغ المصدر الأصلي بإيقاف إنتاج الأجزاء بهذه السرعة. تُعرف عملية تطبيع التدفق هذه باسم الضغط الخلفي.

Teeing

يمكن بدء بث قابل للقراءة (يُسمى هذا الإجراء باسم شكل الحرف الكبير "T") باستخدام طريقة tee(). سيؤدي ذلك إلى قفل البث، أي لن يعود قابلاً للاستخدام مباشرةً، ولكن سيؤدي إلى إنشاء سلسلتَي بث جديدتَين، تُعرفان باسم الفروع، ويمكن استخدامهما بشكل مستقل. من المهم أيضًا ضبط البث مسبقًا لأنّه لا يمكن ترجيع البث أو إعادة تشغيله. وسنوضّح لك المزيد من المعلومات حول هذا الموضوع لاحقًا.

مخطّط بياني لسلسلة قنوات تتألّف من بث قابل للقراءة ناتج عن طلب إلى واجهة برمجة التطبيقات fetch API، ويتم توجيهه بعد ذلك من خلال بث تحويل يتم تقسيم مخرجاته ثم إرسالها إلى المتصفّح للبث القابل للقراءة الأول الناتج، وإلى ذاكرة التخزين المؤقت لعامل الخدمة للبث القابل للقراءة الثاني الناتج.
سلسلة أنابيب.

آليات البث القابل للقراءة

مصدر البيانات القابل للقراءة هو مصدر بيانات يتم تمثيله في JavaScript من خلال عنصر ReadableStream الذي يتدفق من مصدر أساسي. تنشئ الدالة الإنشائية ReadableStream() كائن بث قابل للقراءة من معالِجات معيّنة، وتعرضه. هناك نوعان من المصادر الأساسية:

  • تُرسِل مصادر الإرسال البيانات إليك باستمرار عند الوصول إليها، ويكون بإمكانك بدء البث أو إيقافه مؤقتًا أو إلغاء الوصول إليه. وتشمل الأمثلة أحداث البث المباشر للفيديو أو الأحداث المُرسَلة من الخادم أو WebSockets.
  • تتطلّب مصادر السحب منك طلب البيانات منها صراحةً بعد الربط بها. وتشمل الأمثلة عمليات HTTP من خلال طلبات fetch() أو XMLHttpRequest.

تتم قراءة بيانات البث بشكل تسلسلي في أجزاء صغيرة تُعرف باسم المقاطع. يُقال إنّ الأجزاء التي يتم وضعها في بث مُدرَجة في قائمة الانتظار. وهذا يعني أنّه في انتظار المراجعة. تتتبّع القائمة الداخلية الأجزاء التي لم تتم قراءتها بعد.

استراتيجية وضع المحتوى في قائمة الانتظار هي عنصر يحدّد كيفية إرسال بث إشارة الضغط الخلفي استنادًا إلى حالة قائمة الانتظار الداخلية. تحدّد استراتيجية وضع المحتوى في "قائمة المحتوى التالي" حجمًا لكل جزء، وتقارن بين الحجم الإجمالي لجميع الأجزاء في "قائمة المحتوى التالي" وعدد محدّد يُعرف باسم الحد الأقصى المسموح به.

يقرأ قارئ الأجزاء داخل البث. يسترجع هذا القارئ البيانات في قطعة واحدة في كل مرة، ما يتيح لك إجراء أي نوع من العمليات التي تريد إجراؤها عليها. يُطلق على القارئ بالإضافة إلى رمز المعالجة الآخر المصاحب له اسم مستهلك.

يُطلق على البنية التالية في هذا السياق اسم عنصر تحكّم. يحتوي كل مصدر بيانات قابل للقراءة على عنصر تحكّم مرتبط به، كما يوحي الاسم، يتيح لك التحكّم في مصدر البيانات.

يمكن لقارئ واحد فقط قراءة مصدر بيانات في كل مرة. عند إنشاء قارئ وبدء قراءة مصدر بيانات، (أي يصبح قارئًا نشطًا)، يتم قفله عليه. إذا أردت أن يتولى قارئ آخر قراءة البث، عليك عادةً إلغاء قراءة القارئ الأول قبل تنفيذ أي إجراء آخر (على الرغم من أنّه يمكنك تقسيم أحداث البث).

إنشاء مصدر بيانات قابل للقراءة

يمكنك إنشاء مصدر بيانات قابل للقراءة من خلال استدعاء عنصر الإنشاء ReadableStream(). يحتوي المُنشئ على وسيطة اختيارية underlyingSource، والتي تمثّل كائنًا يتضمّن طُرقًا وسمات تحدّد سلوك مثيل البث الذي تم إنشاؤه.

فريق underlyingSource

ويمكن أن يستخدِم هذا الإجراء الطرق الاختيارية التالية التي يحدّدها المطوّر:

  • start(controller): يتمّ استدعاؤه على الفور عند إنشاء العنصر. يمكن للطريقة الوصول إلى مصدر البث وتنفيذ أي إجراء آخر مطلوب لإعداد وظيفة البث. إذا كان من المفترض تنفيذ هذه العملية بشكل غير متزامن، يمكن للطريقة عرض وعد للإشارة إلى النجاح أو الفشل. المعلَمة controller التي تم تمريرها إلى هذه الطريقة هي a ReadableStreamDefaultController.
  • pull(controller): يمكن استخدامها للتحكّم في البث أثناء جلب المزيد من الأجزاء. ويتم استدعاؤه مراراً وتكراراً ما دامت قائمة الانتظار الداخلية للبث غير ممتلئة، إلى أن تصل قائمة الانتظار إلى الحد الأقصى. إذا كانت نتيجة استدعاء pull() هي وعد، لن يتم استدعاء pull() مرة أخرى إلى أن يتم الوفاء بهذا الوعد. إذا تم رفض الوعد، سيظهر خطأ في البث.
  • cancel(reason): يتمّ استدعاؤه عندما يلغي مستخدِم البث البث.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

تتيح ReadableStreamDefaultController الطرق التالية:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء ReadableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة المحتوى التالي" للبث، ويأخذ مَعلمتَين:

  • highWaterMark: رقم غير سالب يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه.
  • size(chunk): دالة تحسب الحجم المحدّد غير السالب لقيمة الجزء المحدّدة وتعرضه. وتُستخدَم النتيجة لتحديد الضغط الخلفي الذي يظهر من خلال السمة ReadableStreamDefaultController.desiredSize المناسبة. ويحدِّد أيضًا حالات استدعاء طريقة pull() للمصدر الأساسي.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

الطريقتان getReader() وread()

للقراءة من مصدر قابل للقراءة، تحتاج إلى قارئ، وهو ReadableStreamDefaultReader. تنشئ طريقة getReader() لواجهة ReadableStream قارئًا وتقفِل البث عليه. عندما يكون البث مقفلًا، لا يمكن الحصول على قارئ آخر إلى أن يتم تحرير هذا القارئ.

تُعرِض طريقة read() لواجهة ReadableStreamDefaultReader وعدًا يسمح بالوصول إلى القطعة التالية في قائمة الانتظار الداخلية للبث. يتم تنفيذ الطلب أو رفضه بنتيجة استنادًا إلى حالة البث. في ما يلي الاحتمالات المختلفة:

  • إذا كان هناك قطعة متاحة، سيتم تنفيذ الوعد بعنصر من الشكل
    { value: chunk, done: false }.
  • إذا أصبح مصدر البيانات مغلقًا، سيتم تنفيذ الوعد باستخدام عنصر من النوع
    { value: undefined, done: true }.
  • إذا حدث خطأ في البث، سيتم رفض الوعد مع تضمين الخطأ ذي الصلة.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

سمة locked

يمكنك التحقّق مما إذا كان مصدر بيانات قابل للقراءة مُقفَلًا من خلال الانتقال إلى ReadableStream.locked موقعه.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

عيّنات رمز بث سهل القراءة

يوضّح نموذج الرمز البرمجي أدناه جميع الخطوات أثناء تنفيذها. عليك أولاً إنشاء ReadableStream تحدِّد في underlyingSource (أي فئة TimestampSource) طريقة start(). تُطلب من خلال هذه الطريقة من controller البث enqueue() طابعًا زمنيًا كل ثانية لمدة عشر ثوانٍ. أخيرًا، يطلب من وحدة التحكّم close() البث. يمكنك استخدام هذا البث من خلال إنشاء قارئ باستخدام الطريقة getReader() واستدعاء read() إلى أن يصبح البث done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

التكرار غير المتزامن

قد لا تكون واجهة برمجة التطبيقات الأكثر ملاءمةً هي التحقّق من كل تكرار read() للحلقة إذا كان البث done. لحسن الحظ، ستتوفّر قريبًا طريقة أفضل لإجراء ذلك: التكرار غير المتزامن.

for await (const chunk of stream) {
  console.log(chunk);
}

إنّ أحد الحلول البديلة لاستخدام التكرار غير المتزامن اليوم هو تنفيذ السلوك باستخدام polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

إنشاء بث قابل للقراءة

تُنشئ طريقة tee() لواجهة ReadableStream البث القابل للقراءة الحالي، وتُعرِض صفيفًا من عنصرَين يحتوي على الفرعين الناتجَين كمثيلَين جديدَين من ReadableStream. يتيح ذلك لقراءين قراءة بث في الوقت نفسه. يمكنك إجراء ذلك، على سبيل المثال، في مشغّل خدمة إذا كنت تريد جلب استجابة من الخادم وبثّها إلى المتصفّح، ولكن أيضًا بثّها إلى التخزين المؤقت لمشغّل الخدمة. بما أنّه لا يمكن استخدام نص الردّ أكثر من مرّة، تحتاج إلى نسختَين للقيام بذلك. لإلغاء البث، عليك إلغاء كلا الفرعين الناتجَين. سيؤدي بدء بث محتوى إلى قفله بشكل عام طوال المدة، ما يمنع القرّاء الآخرين من قفله.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

تدفّقات وحدات البايت القابلة للقراءة

بالنسبة إلى أحداث البث التي تمثّل وحدات البايت، يتوفّر إصدار موسّع من البث القابل للقراءة لمعالجة وحدات البايت بكفاءة، لا سيما من خلال تقليل عدد النُسخ. تتيح أحداث "بث المحتوى" اكتساب قراء "جلب المحتوى الخاص بك" (BYOB). يمكن أن يقدّم التنفيذ التلقائي مجموعة من المخرجات المختلفة، مثل السلاسل أو مخازن مصفوفات في حالة WebSockets، في حين تضمن تدفّقات البايتات إخراج البايتات. بالإضافة إلى ذلك، يستفيد قراء BYOB من مزايا الثبات. ويعود سبب ذلك إلى أنّه في حال فصل المخزن المؤقت، يمكن ضمان عدم الكتابة في المخزن المؤقت نفسه مرّتين، وبالتالي تجنُّب حالات السباق. يمكن لقراء BYOB تقليل عدد المرات التي يحتاج فيها المتصفّح إلى تنفيذ عملية جمع القمامة، لأنّه يمكنه إعادة استخدام ذاكرة التخزين المؤقت.

إنشاء بث وحدات بت قابلة للقراءة

يمكنك إنشاء بث بايتات قابل للقراءة عن طريق تمرير مَعلمة type إضافية إلى الدالة الإنشائية ReadableStream().

new ReadableStream({ type: 'bytes' });

فريق underlyingSource

يتم منح المصدر الأساسي لبث وحدات البايت القابلة للقراءة ReadableByteStreamController للتلاعب به. تأخذ الطريقة ReadableByteStreamController.enqueue() وسيطة chunk تكون قيمتها ArrayBufferView. تعرض السمة ReadableByteStreamController.byobRequest طلب سحب BYOB الحالي، أو قيمة فارغة إذا لم يكن هناك طلب. أخيرًا، يعرض الحقل ReadableByteStreamController.desiredSize الحجم المطلوب لملء "القائمة الداخلية" للبث الخاضع للتحكّم.

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء ReadableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة الانتظار" للبث، والتي تأخذ مَعلمة واحدة:

  • highWaterMark: عدد غير سالب من وحدات البايت يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه. يُستخدَم ذلك لتحديد الضغط الخلفي الذي يظهر من خلال السمة ReadableByteStreamController.desiredSize المناسبة. ويحدِّد أيضًا حالات استدعاء طريقة pull() للمصدر الأساسي.

الطريقتان getReader() وread()

يمكنك بعد ذلك الوصول إلى ReadableStreamBYOBReader من خلال ضبط المَعلمة mode وفقًا لذلك: ReadableStream.getReader({ mode: "byob" }). يتيح ذلك التحكّم بشكل أكثر دقة في تخصيص ملف التخزين المؤقت لتجنُّب النُسخ. للقراءة من بث البايتات، عليك استدعاء ReadableStreamBYOBReader.read(view)، حيث يكون view هو ArrayBufferView.

نموذج تعليمات برمجية لبث وحدات البايت القابلة للقراءة

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

تعرض الدالة التالية مصادر بيانات قابلة للقراءة تتيح قراءة صفيف تم إنشاؤه عشوائيًا بكفاءة بدون نسخ البيانات. بدلاً من استخدام حجم قطعة محدّد مسبقًا يبلغ 1,024، يحاول ملء المخازن المؤقتة التي يقدّمها المطوّر، ما يتيح التحكّم الكامل.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

آليات بث قابل للكتابة

البث القابل للكتابة هو وجهة يمكنك كتابة البيانات فيها، ويتم تمثيله في JavaScript بكائن WritableStream. ويعمل هذا العنصر كعنصر تجريد على مستوى أعلى من وحدة معالجة بيانات أساسية، وهي وحدة معالجة بيانات I/O من المستوى الأدنى يتم فيها كتابة البيانات الأولية.

يتم كتابة البيانات في البث من خلال كاتب، كل جزء في المرة الواحدة. يمكن أن تتّخذ الشريحة العديد من الأشكال، تمامًا مثل الشرائح في قارئ. يمكنك استخدام أي رمز تريده لإنشاء المقاطع الجاهزة للكتابة. ويُطلق على الكاتب والرمز المرتبط به اسم منتج.

عند إنشاء كاتب وبدء الكتابة في بث (كاتب نشط)، يُقال أنّه مُقفَل عليه. يمكن لكاتب واحد فقط الكتابة في بث قابل للكتابة في المرة الواحدة. إذا أردت أن يبدأ كاتب آخر في كتابة محتوى في البث، عليك عادةً تحريره قبل ربط كاتب آخر به.

تتتبّع القائمة الداخلية الأجزاء التي تمّت كتابتها إلى البث ولكن لم تتم معالجتها بعد من خلال وحدة الاستقبال الأساسية.

استراتيجية وضع المحتوى في قائمة الانتظار هي عنصر يحدّد كيفية إرسال بث إشارة الضغط الخلفي استنادًا إلى حالة قائمة الانتظار الداخلية. تحدّد استراتيجية وضع المحتوى في "قائمة المحتوى التالي" حجمًا لكل جزء، وتقارن بين الحجم الإجمالي لجميع الأجزاء في "قائمة المحتوى التالي" وعدد محدّد يُعرف باسم الحد الأقصى المسموح به.

يُطلق على البنية النهائية اسم عنصر تحكّم. يحتوي كل مصدر بيانات قابل للكتابة على وحدة تحكّم مرتبطة تسمح لك بالتحكم في مصدر البيانات (على سبيل المثال، لإيقافه).

إنشاء بث قابل للكتابة

توفّر واجهة WritableStream في واجهة برمجة التطبيقات Streams API نموذجًا معياريًا لتدوين بيانات البث إلى وجهة، تُعرف باسم "الوجهة". يتضمّن هذا العنصر ميزة الضغط الخلفي وميزة وضع المحتوى في قائمة الانتظار. يمكنك إنشاء بث قابل للكتابة من خلال استدعاء عنصر الإنشاء WritableStream(). تتضمّن هذه الطريقة مَعلمة underlyingSink اختيارية تمثّل عنصرًا يتضمّن طُرقًا وسمات تحدّد سلوك مثيل البث الذي تم إنشاؤه.

فريق underlyingSink

يمكن أن يتضمّن underlyingSink الطرق الاختيارية التالية التي يحدّدها المطوّر. المَعلمة controller المُرسَلة إلى بعض الطرق هي WritableStreamDefaultController.

  • start(controller): يتمّ استدعاء هذه الطريقة على الفور عند إنشاء الكائن. يجب أن تهدف محتويات هذه الطريقة إلى الوصول إلى الحوض الأساسي. إذا كان من المفترض أن تتم هذه العملية بشكل غير متزامن، يمكن أن تُرجع وعدًا للإشارة إلى النجاح أو الفشل.
  • write(chunk, controller): سيتمّ استدعاء هذه الطريقة عندما تكون مجموعة جديدة من البيانات (المحدّدة في المَعلمة chunk) جاهزة للكتابة في المصرف الأساسي. ويمكن أن يعرض وعدًا لتحديد نجاح عملية الكتابة أو تعذّرها. لن يتمّ استدعاء هذه الطريقة إلّا بعد نجاح عمليات الكتابة السابقة، ولن يتمّ استدعاؤها مطلقًا بعد إغلاق البث أو إيقافه.
  • close(controller): سيتمّ استدعاء هذه الطريقة إذا أرسَل التطبيق إشارة بأنّه قد انتهى من كتابة المقاطع إلى البث. يجب أن تُجري العناصر ما يلزم لإنهاء عمليات الكتابة إلى الوحدة الأساسية لمعالجة البيانات، وإلغاء إمكانية الوصول إليها. إذا كانت هذه العملية غير متزامنة، يمكنها عرض وعد لإشارة إلى النجاح أو الفشل. لن يتمّ استدعاء هذه الطريقة إلا بعد نجاح كل عمليات الكتابة التي تمّ وضعها في "قائمة الانتظار".
  • abort(reason): سيتمّ استدعاء هذه الطريقة إذا أراد التطبيق إغلاق البث فجأة ووضعه في حالة خطأ. ويمكنه تنظيف أي موارد محجوزة، تمامًا مثل close()، ولكن سيتم استدعاء abort() حتى إذا تم وضع عمليات الكتابة في قائمة الانتظار. وسيتم تجاهل هذه الأجزاء. إذا كانت هذه العملية غير متزامنة، يمكنها عرض وعد للإشارة إلى النجاح أو الفشل. تحتوي المَعلمة reason على DOMString تصف سبب إيقاف البث.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

تمثل واجهة برمجة التطبيقات WritableStreamDefaultController Streams API وحدة تحكّم تتيح التحكّم في حالة WritableStream أثناء الإعداد، أو عند إرسال المزيد من الأجزاء للكتابة، أو في نهاية الكتابة. عند إنشاء WritableStream، يتم منح الحوض الأساسي مثيلًا WritableStreamDefaultController مقابلًا للتلاعب به. يحتوي WritableStreamDefaultController على طريقة واحدة فقط: WritableStreamDefaultController.error()، مما يؤدي إلى حدوث خطأ في أي تفاعلات مستقبلية مع البث المرتبط. تتيح WritableStreamDefaultController أيضًا استخدام السمة signal التي تعرض مثيلًا من AbortSignal، مما يسمح بإيقاف عملية WritableStream إذا لزم الأمر.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء WritableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة المحتوى التالي" للبث، ويأخذ مَعلمتَين:

  • highWaterMark: رقم غير سالب يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه.
  • size(chunk): دالة تحسب الحجم المحدّد غير السالب لقيمة الجزء المحدّدة وتعرضه. وتُستخدَم النتيجة لتحديد الضغط الخلفي الذي يظهر من خلال السمة WritableStreamDefaultWriter.desiredSize المناسبة.

الطريقتان getWriter() وwrite()

للكتابة في بث قابل للكتابة، تحتاج إلى كاتب، وسيكون WritableStreamDefaultWriter. تُعرِض طريقة getWriter() لواجهة WritableStream مثيلًا جديدًا من WritableStreamDefaultWriter وتُقفِل البث على هذا المثيل. عندما يكون تدفق الرسائل مقفَلاً، لا يمكن الحصول على كاتب آخر إلى أن يتم تحرير الكاتب الحالي.

تُسجِّل طريقة write() واجهة WritableStreamDefaultWriter مجموعة بيانات تم تمريرها في WritableStream ووحدة المعالجة الأساسية، ثم تُعيد وعدًا يتم حلّه للإشارة إلى نجاح عملية الكتابة أو فشلها. يُرجى العِلم أنّ معنى "النجاح" يعتمد على وحدة النقل الأساسية، فقد يشير ذلك إلى قبول المقطع، وليس بالضرورة أن يكون قد تم حفظه بأمان في وجهته النهائية.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

سمة locked

يمكنك التحقّق مما إذا كان مصدر بيانات قابل للكتابة قد تم قفله من خلال الوصول إلى WritableStream.locked موقعه.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نموذج رمز بث Writable

يعرض نموذج الرمز البرمجي أدناه جميع الخطوات أثناء تنفيذها.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

توجيه بث قابل للقراءة إلى بث قابل للكتابة

يمكن توجيه بث قابل للقراءة إلى بث قابل للكتابة من خلال أسلوب pipeTo() للبث القابل للقراءة. تُوجِّه ReadableStream.pipeTo() القيمة الحالية ReadableStreamإلى WritableStream معيّنة وتُعرِض وعدًا يتم تنفيذه عند اكتمال عملية توجيه البيانات بنجاح، أو يتم رفضه في حال حدثت أي أخطاء.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

إنشاء بث تحويل

تمثّل واجهة TransformStream في Streams API مجموعة من البيانات القابلة للتحويل. يمكنك إنشاء بث تحويل من خلال استدعاء الدالة الإنشائية TransformStream() التي تنشئ كائن بث تحويل وتُرجعه من معالِجات معيّنة. تقبل دالة الإنشاء TransformStream() كوسيطة أولى عنصر JavaScript اختياريًا يمثّل transformer. يمكن أن تحتوي هذه العناصر على أيٍّ من الطرق التالية:

فريق transformer

  • start(controller): يتمّ استدعاء هذه الطريقة على الفور عند إنشاء الكائن. ويُستخدَم عادةً هذا الإجراء لإضافة أجزاء البادئة إلى "قائمة الانتظار" باستخدام controller.enqueue(). وستتم قراءة هذه الأجزاء من الجانب القابل للقراءة، ولكن لا تعتمد على أي عمليات كتابة في الجانب القابل للكتابة. إذا كانت هذه العملية التمهيدية غير متزامنة، على سبيل المثال لأنّ الحصول على أجزاء البادئة يتطلّب بعض الجهد، يمكن للدالة عرض وعد للإشارة إلى النجاح أو الفشل. سيؤدي الوعد المرفوض إلى حدوث خطأ في السلسلة. ستعيد بنية TransformStream() طرح أي استثناءات يتم طرحها.
  • transform(chunk, controller): يتمّ استدعاء هذه الطريقة عندما تكون مجموعة جديدة مكتوبة في الأساس على الجانب القابل للكتابة جاهزة للتحويل. يضمن تنفيذ البث عدم استدعاء هذه الدالة إلا بعد نجاح عمليات التحويل السابقة، ولن يتم استدعاؤها أبدًا قبل اكتمال start() أو بعد استدعاء flush(). تُنفِّذ هذه الدالة عملية التحويل الفعلية لبث التحويل. ويمكنه إضافة النتائج إلى "قائمة الانتظار" باستخدام controller.enqueue(). يسمح هذا الإجراء بكتابة قطعة واحدة في الجانب القابل للكتابة، ما يؤدي إلى عدم ظهور أيّ قطعة أو ظهور عدة قطع في الجانب القابل للقراءة، وذلك استنادًا إلى عدد مرّات استدعاء controller.enqueue(). إذا كانت عملية التحويل غير متزامنة، يمكن أن تعرِض هذه الدالة وعدًا للإشارة إلى نجاح عملية التحويل أو فشلها. سيؤدي الوعد المرفوض إلى حدوث خطأ في كل من الجانبَين القابلَين للقراءة والكتابة من مجرى التحويل. في حال عدم تقديم طريقة transform()، يتم استخدام عملية تحويل الهوية التي تضيف الشرائح إلى "قائمة الانتظار" بدون تغيير من الجانب القابل للكتابة إلى الجانب القابل للقراءة.
  • flush(controller): يتمّ استدعاء هذه الطريقة بعد أن يتم تحويل كلّ الأجزاء التي تمّ كتابتها على الجانب القابل للكتابة من خلال تمريرها بنجاح من خلال transform()، ويكون الجانب القابل للكتابة على وشك الإغلاق. ويُستخدَم هذا الإجراء عادةً لإدراج أجزاء اللاحقة في "قائمة الانتظار" على الجانب القابل للقراءة، قبل أن يصبح هذا الجانب أيضًا مغلقًا. إذا كانت عملية الفلاش غير متزامنة، يمكن للدالة عرض وعد لتحديد ما إذا كان الفلاش ناجحًا أو تعذّر إكماله، وسيتم إبلاغ المتصل بالدالة stream.writable.write() بالنتيجة. بالإضافة إلى ذلك، سيؤدي الوعد المرفوض إلى حدوث خطأ في كلّ من الجانبَين القابلَين للقراءة والكتابة في البث. يتم التعامل مع طرح استثناء بالطريقة نفسها التي يتم بها عرض وعد مُرفوض.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

استراتيجيات وضع المحتوى في "قائمة المحتوى التالي" في writableStrategy وreadableStrategy

المَعلمتان الثانيتان والثالثتان الاختياريتان لصانع TransformStream() هما استراتيجيةَا الانتظار writableStrategy وreadableStrategy. ويتم تحديدها على النحو الموضّح في القسمين readable وwritable stream على التوالي.

نموذج رمز بث التحويل

يعرض نموذج الرمز البرمجي التالي بثًا بسيطًا للتحويل.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

توجيه بث قابل للقراءة من خلال بث تحويل

توفّر طريقة pipeThrough() لواجهة ReadableStream طريقة قابلة للربط لتوجيه البث الحالي من خلال بث تحويل أو أي زوج آخر قابل للكتابة/القراءة. سيؤدي توجيه بث إلى قفله بشكل عام طوال مدة التوجيه، ما يمنع القرّاء الآخرين من قفله.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

يوضّح نموذج الرمز البرمجي التالي (المُعدّ بشكل مصطنع بعض الشيء) كيفية تنفيذ إصدار "صاخب" من fetch() يحوّل النص إلى نص كبير باستخدام وعد الاستجابة المعروض كبث وتحويل كل جزء إلى نص كبير. وتتمثل ميزة هذا النهج في أنّك لست بحاجة إلى الانتظار إلى أن يتم تنزيل المستند بأكمله، ما يمكن أن يحدث فرقًا كبيرًا عند التعامل مع الملفات الكبيرة.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

عرض توضيحي

يعرض المقطع التجريبي أدناه أحداث البث القابلة للقراءة والكتابة والتحويل أثناء تنفيذها. يتضمّن أيضًا أمثلة على سلاسل الأنابيب pipeThrough() وpipeTo()، ويوضّح أيضًا tee(). يمكنك اختياريًا تشغيل الإصدار التجريبي في نافذته الخاصة أو عرض رمز المصدر.

أحداث بث مفيدة متاحة في المتصفّح

هناك عدد من مصادر البيانات المفيدة المضمّنة في المتصفّح مباشرةً. يمكنك بسهولة إنشاء ReadableStream من قطعة بيانات. تُرجع طريقة stream()‎ في واجهة Blob ReadableStream الذي يعرض عند القراءة البيانات المضمّنة في العنصر المصغّر. تذكَّر أيضًا أنّ كائن File هو نوع معيّن من Blob، ويمكن استخدامه في أي سياق يمكن استخدام ملفّ نصي فيه.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

يُطلق على الصيغ المخصّصة للبث من TextDecoder.decode() وTextEncoder.encode() اسم TextDecoderStream و TextEncoderStream على التوالي.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

يمكنك بسهولة ضغط ملف أو فك ضغطه باستخدام بثَي التحويل CompressionStream و DecompressionStream على التوالي. يوضّح نموذج الرمز البرمجي أدناه كيفية تنزيل مواصفات Streams وضغطها (باستخدام gzip) مباشرةً في المتصفّح وكتابة الملف المضغوط على القرص مباشرةً.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

إنّ FileSystemWritableFileStream وfetch() تدفّقات الطلبات التجريبية في File System Access API هي مثالان على تدفّقات البيانات القابلة للكتابة في الوقت الفعلي.

تستخدِم Serial API بشكل كبير كلّ من مصادر البيانات القابلة للقراءة والكتابة.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

أخيرًا، تدمج واجهة برمجة تطبيقات WebSocketStream أحداث البث مع واجهة برمجة تطبيقات WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

مراجع مفيدة

الشكر والتقدير

تمت مراجعة هذه المقالة من قِبل Jake Archibald، François Beaufort، Sam Dutton، Mattias Buelens، Surma، Joe Medley، Adam Rice. ساعدتني منشورات المدونة التي نشرها Jake Archibald كثيرًا في فهم البث المباشر. استُوحيت بعض نماذج الرموز البرمجية من استكشافات مستخدم GitHub @bellbind، وتمت برمجة أجزاء من النثر بشكل كبير استنادًا إلى مستندات الويب MDN على Streams. لقد بذل مؤلفو معيار أحداث البث جهدًا رائعًا في كتابة هذه المواصفات. الصورة الرئيسية من إنشاء ريان لارا على Unsplash.