أحداث البث: الدليل النهائي

تعرَّف على كيفية استخدام مجموعات البث التي يمكن قراءتها وكتابتها وتحويلها من خلال Streams API.

تتيح لك Streams API الوصول بشكل آلي إلى مصادر البيانات الواردة عبر الشبكة أو التي يتم إنشاؤها بأي طريقة محلية ومعالجتها باستخدام JavaScript. يتضمن البث تقسيم المورد الذي تريد استلامه أو إرساله أو تحويله إلى أجزاء صغيرة، ثم معالجة هذه المقاطع شيئًا فشيئًا. فالبث هو إجراء تجريه المتصفّحات على أي حال عند تلقّي مواد عرض مثل HTML أو الفيديوهات لعرضها على صفحات الويب، إلا أنّ هذه الإمكانية لم تكن متاحة مطلقًا في JavaScript قبل طرح fetch مع عمليات البث في عام 2015.

في السابق، إذا كنت تريد معالجة مورد من نوع ما (سواء كان مقطع فيديو أو ملفًا نصيًا وما إلى ذلك)، فسيتعين عليك تنزيل الملف بالكامل، والانتظار حتى يتم إلغاء تسلسله بتنسيق مناسب، ثم معالجته. مع توفر مجموعات البث لـ JavaScript، يتغير هذا كله. يمكنك الآن معالجة البيانات الأولية باستخدام JavaScript تدريجيًا عندما تصبح متاحة على العميل، بدون الحاجة إلى إنشاء مخزن مؤقت أو سلسلة أو كائن ثنائي كبير (blob). يتيح ذلك إمكانية الوصول إلى عدد من حالات الاستخدام، وندرج بعضها أدناه:

  • تأثيرات الفيديو: إرسال فيديو مضمّن قابل للقراءة من خلال بث تحويلي يطبّق التأثيرات في الوقت الفعلي.
  • ضغط البيانات (de): نقل مصدر بيانات الملفات من خلال مصدر بيانات يعمل بشكل انتقائي (de) ضغط الملف.
  • فك ترميز الصور: يتم تمرير بث استجابة HTTP من خلال تدفق تحويل يفك ترميز وحدات البايت إلى بيانات صور نقطية، ثم من خلال بث تحويل آخر يترجم الصور النقطية إلى صور PNG. إذا تم تثبيت هذه الإضافة داخل معالج fetch لمشغّل الخدمات، تسمح لك هذه الميزة بإضافة تنسيقات جديدة للصور، مثل AVIF.

المتصفحات المتوافقة

ReadableStream وWritableStream

التوافق مع المتصفح

  • 43
  • 14
  • 65
  • 10.1

المصدر

TransformStream

التوافق مع المتصفح

  • 67
  • 79
  • 102
  • 14.1

المصدر

المفاهيم الأساسية

قبل أن أتحدث بالتفصيل عن الأنواع المختلفة لأحداث البث المباشر، سأتحدّث عن بعض المفاهيم الأساسية.

قِطَع

المقطع هو جزء واحد من البيانات تتم كتابته أو قراءته من مصدر بيانات. يمكن أن يكون من أي نوع؛ يمكن أن تحتوي جداول البيانات حتى على أجزاء من أنواع مختلفة. وفي معظم الأحيان، لن تكون المقطعة أكثر وحدة بيانات ذرية لتدفق معين. على سبيل المثال، قد يحتوي بث البايت على مجموعات تتألف من 16 وحدة بايت Uint8Array، بدلاً من وحدات بايت واحدة.

مجموعات بث قابلة للقراءة

تمثل ساحة المشاركات القابلة للقراءة مصدر بيانات يمكنك القراءة منه. بعبارة أخرى، تأتي البيانات من مصدر قابل للقراءة. بعبارة أخرى، البث القابل للقراءة هو مثيل للفئة ReadableStream.

مجموعات البث القابلة للكتابة

تمثل ساحة المشاركات القابلة للكتابة وجهة البيانات التي يمكنك الكتابة فيها. بعبارة أخرى، تدخل البيانات في مصدر قابل للكتابة. في الواقع، البث القابل للكتابة هو مثال على الفئة WritableStream.

تحويل ساحات المشاركات

يتكوّن بث التحويل من مجموعتَي بث: أيّ بث قابل للكتابة، يُعرف باسم جانبه القابل للكتابة، وساحة مشاركات قابلة للقراءة تُعرف باسم جانبها القابل للقراءة. ومن الأمثلة على ذلك الاستعارة الواقعية عن ذلك، وهي مترجم فوري يترجم المحتوى بشكل فوري من لغة إلى أخرى. وبطريقة خاصة بتدفق التحويل، تؤدي الكتابة على الجانب القابل للكتابة إلى توفير بيانات جديدة للقراءة على الجانب القابل للقراءة. بشكل عام، يمكن لأي عنصر يتضمّن السمة writable والسمة readable أن يكون بمثابة مصدر بيانات تحويل. مع ذلك، تسهّل فئة TransformStream العادية إنشاء مثل هذا الزوج الذي يتشابك بشكل صحيح.

سلاسل الأنابيب

تُستخدَم ساحات المشاركات بشكل أساسي من خلال ربطها ببعضها. يمكن توجيه البث القابل للقراءة مباشرةً إلى بث قابل للكتابة باستخدام طريقة pipeTo() للبث القابل للقراءة، أو يمكن تمريره عبر بث تحويل واحد أو أكثر أولاً، باستخدام أسلوب pipeThrough() القابل للقراءة. يُشار إلى مجموعة من التدفقات معًا بهذه الطريقة باسم سلسلة أنابيب.

الضغط الخلفي

بمجرد إنشاء سلسلة الممرات، سوف تنشر الإشارات المتعلقة بمدى سرعة تدفق الأجزاء من خلالها. إذا لم تتمكن أي خطوة في السلسلة من قبول المجموعات حتى الآن، فإنها تنشر إشارة إلى الخلف عبر سلسلة الممرات، حتى يتم في النهاية إخبار المصدر الأصلي بالتوقف عن إنتاج المقاطع بهذه السرعة. تُسمى عملية تسوية التدفق هذه الضغط العكسي.

قماش كتاني

يمكن ربط ساحة مشاركات قابلة للقراءة (تتم تسميتها على شكل حرف T الكبير) باستخدام طريقة tee(). سيؤدي ذلك إلى قفل ساحة المشاركات، أي لن تتمكن من استخدامها بشكل مباشر، ومع ذلك، ستنشئ مجموعتَين جديدتَين لبثَي البيانات اسما فرعَين ويمكن استخدامهما بشكل مستقل. ولا يمكن إرجاع البث المباشر في أي وقت، لأنّه لا يمكن لفّه أو إعادة تشغيله. سنتحدث عن ذلك في وقت لاحق.

رسم تخطيطي لسلسلة ممرات يتكون من تدفق قابل للقراءة يأتي من استدعاء لواجهة برمجة تطبيقات الجلب يتم تمريرها بعد ذلك عبر تدفق تحويل يتم فيه نقل مخرجاته ثم إرسالها إلى المتصفِّح لأول بث ناتج قابل للقراءة وإلى ذاكرة التخزين المؤقت لمشغِّل الخدمات في البث الثاني الناتج القابل للقراءة.
سلسلة الممرات.

آليات البث القابل للقراءة

البث القابل للقراءة هو مصدر بيانات يتم تمثيله في JavaScript بواسطة عنصر ReadableStream يتدفق من مصدر أساسي. تُنشئ الدالة الإنشائية ReadableStream() عنصر بث يمكن قراءته من المعالجات المحددة. هناك نوعان من المصادر الأساسية:

  • دفع المصادر: يعمل هذا الإجراء على إرسال البيانات إليك باستمرار عند الوصول إليها، ويمكنك بدء الوصول إلى البث أو إيقافه مؤقتًا أو إلغائه. وتشمل الأمثلة فيديوهات البث المباشر أو الأحداث التي يرسلها الخادم أو WebSockets.
  • تتطلّب مصادر السحب طلب البيانات منها بشكل صريح بعد الربط بها. وتشمل الأمثلة عمليات HTTP من خلال استدعاءات fetch() أو XMLHttpRequest.

تتم قراءة بيانات البث بشكل تسلسلي على شكل أجزاء صغيرة تُسمى الأجزاء. يُقال إنّ الأجزاء الموضوعة في ساحة مشاركات مدرَجة في قائمة الانتظار. هذا يعني أنهم ينتظرون في قائمة قائمة انتظار وجاهزين للقراءة. تتبع قائمة الانتظار الداخلية المقاطع التي لم تتم قراءتها بعد.

استراتيجية الإضافة إلى "قائمة المحتوى التالي" هي كائن يحدّد كيفية إشارة البث إلى الضغط العكسي بناءً على حالة قائمة الانتظار الداخلية. تحدّد استراتيجية "الإضافة إلى قائمة المحتوى التالي" حجمًا لكل مجموعة، وتقارن الحجم الإجمالي لكل المقاطع في قائمة الانتظار بعدد محدّد، تُعرف باسم علامة المياه العالية.

يقرأ أحد القرّاء المقاطع المعروضة في ساحة المشاركات. يسترد هذا القارئ البيانات مقطعًا واحدًا في كل مرة، مما يسمح لك بالقيام بأي نوع من العملية التي تريد القيام بها عليها. يُطلق على القارئ بالإضافة إلى رمز المعالجة الآخر معه اسم المستهلك.

تُسمى التركيبة التالية في هذا السياق وحدة تحكُّم. ويتضمن كل بث قابل للقراءة وحدة تحكم مرتبطة، كما يوحي اسمها، تتيح لك التحكم في البث.

لا يمكن إلا لقارئ واحد قراءة البث في كل مرة، فعندما ينشئ قارئًا ويبدأ في قراءة البث (أي يصبح قارئًا نشطًا)، يتم تأمين البث. إذا كنت تريد أن يتولى قارئ آخر قراءة البث، عليك عادةً إصدار قارئ أوّل قبل اتّخاذ أي إجراء آخر (ولكن يمكنك بدء البث).

إنشاء بث قابل للقراءة

يمكنك إنشاء بث قابل للقراءة من خلال طلب الرمز ReadableStream() لإنشاء البث. تحتوي الدالة الإنشائية على وسيطة اختيارية underlyingSource، والتي تمثل كائنًا بالطرق والخصائص التي تحدد كيف سيتصرف مثيل البث الذي تم إنشاؤه.

فريق underlyingSource

يمكن أن يستخدم هذا الطرق الاختيارية التالية التي يحددها المطوّر:

  • start(controller): يتم استدعاؤه فورًا عند إنشاء الكائن. يمكن للطريقة الوصول إلى مصدر البث وتنفيذ أي إجراءات أخرى مطلوبة لإعداد وظيفة البث. إذا تم إجراء هذه العملية بشكل غير متزامن، فيمكن أن تقدم الطريقة وعدًا بإظهار النجاح أو الفشل. المعلمة controller التي تم تمريرها إلى هذه الطريقة هي ReadableStreamDefaultController.
  • pull(controller): يمكن استخدامه للتحكّم في مجموعة البث من خلال استرجاع المزيد من المقاطع. ويتم استدعاؤه بشكل متكرر طالما أن قائمة الانتظار الداخلية للأجزاء في البث غير ممتلئة إلى أن تصل قائمة الانتظار إلى علامتها المائية العالية. إذا كانت نتيجة الاتصال بالرقم pull() متوقّعة، لن يتم استدعاء "pull()" مرة أخرى إلى أن يتم تنفيذ الوعد المذكور. في حال رفض الوعد، سيصبح البث خاطئًا.
  • cancel(reason): يتم الاتصال به عندما يلغي مستهلك البث البث.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

تتيح السمة ReadableStreamDefaultController الطرق التالية:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

فريق queuingStrategy

الوسيطة الثانية الاختيارية بالمثل، للدالة الإنشائية ReadableStream() هي queuingStrategy. وهو كائن يحدد اختياريًا استراتيجية وضع البث في قائمة المحتوى التالي، الأمر الذي يتطلب معلَمتين هما:

  • highWaterMark: رقم غير سالب يشير إلى ارتفاع علامة تبويب المجرى المائي باستخدام استراتيجية الإضافة إلى "قائمة المحتوى التالي" هذه.
  • size(chunk): دالة تحسب وتعرض الحجم غير السالب المحدود لقيمة المجموعة المحدّدة. يتم استخدام النتيجة لتحديد مقدار الضغط الخلفي، ويظهر هذا الضغط من خلال سمة ReadableStreamDefaultController.desiredSize المناسبة. ويتم أيضًا تطبيق هذه القاعدة عند استدعاء طريقة pull() للمصدر الأساسي.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

طريقتان getReader() وread()

للقراءة من ساحة مشاركات قابلة للقراءة، تحتاج إلى قارئ، وسيكون ReadableStreamDefaultReader. تنشئ الطريقة getReader() في الواجهة ReadableStream قارئًا وتقفل البث فيه. أثناء قفل البث، لا يمكن الحصول على قارئ آخر إلا بعد إصدار هذا البث.

تعرض الطريقة read() في واجهة ReadableStreamDefaultReader وعدًا يوفر إمكانية الوصول إلى المقطع التالي في قائمة الانتظار الداخلية للبث. يفي بالنتيجة أو يرفضها استنادًا إلى حالة البث. في ما يلي الاحتمالات المختلفة:

  • في حال توفُّر مقطع، سيتم تنفيذ الوعد من خلال استخدام عنصر من النموذج
    { value: chunk, done: false }.
  • في حال إغلاق البث، سيتم تنفيذ الوعد من خلال استخدام عنصر من النموذج
    { value: undefined, done: true }.
  • إذا حدث خطأ في البث، سيتم رفض الوعد بعرض الخطأ ذي الصلة.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

السمة locked

يمكنك معرفة ما إذا كانت مجموعة البث القابلة للقراءة مقفلة من خلال الانتقال إلى السمة ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نماذج رموز البث القابلة للقراءة

يعرض نموذج الرمز أدناه جميع الخطوات قيد التنفيذ. عليك أولاً إنشاء ReadableStream تحدِّد في وسيطة underlyingSource الخاصة بها (أي الفئة TimestampSource) طريقة start(). تحدّد هذه الطريقة طابعًا زمنيًا من controller إلى enqueue() في البث كل ثانية خلال عشر ثوانٍ. وأخيرًا، يطلب من وحدة التحكّم close() مجموعة البث. تستهلك هذا البث من خلال إنشاء قارئ باستخدام طريقة getReader() واستدعاء read() إلى أن يصبح البث done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

تكرار غير متزامن

قد لا تكون واجهة برمجة التطبيقات الأكثر ملاءمة بعد التحقّق من كل تكرار تكرارات read() إذا كان البث done. لحسن الحظ، ستكون هناك قريبًا طريقة أفضل للقيام بذلك، وهي التكرار غير المتزامن.

for await (const chunk of stream) {
  console.log(chunk);
}

هناك حل بديل لاستخدام التكرار غير المتزامن اليوم وهو تنفيذ السلوك باستخدام رمز polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

بث محتوى يمكن قراءته

تعمل الطريقة tee() في واجهة ReadableStream على اختيار ساحة المشاركات الحالية القابلة للقراءة، من خلال عرض مصفوفة مكوّنة من عنصرَين تحتوي على الفرعَين الناتجَين على شكل مثيلات ReadableStream جديدة. يتيح ذلك للقرّاء قراءة البث بشكل متزامن. يمكنك مثلاً تنفيذ ذلك في مشغّل خدمات إذا كنت تريد جلب استجابة من الخادم وبثها إلى المتصفح، ولكن مع بثها أيضًا إلى ذاكرة التخزين المؤقت لعامل الخدمة. بما أنّه لا يمكن استهلاك نص الاستجابة أكثر من مرة، عليك الحصول على نسختَين للقيام بذلك. لإلغاء البث، يجب إلغاء كلا الفرعَين الناتجَين عن عملية البث. عادةً ما يؤدي النقر على البث إلى قفله طوال مدة البث، ما يمنع القرّاء الآخرين من قفله.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

مجموعات بث بايت قابلة للقراءة

بالنسبة إلى مجموعات البث التي تمثّل وحدات البايت، يتم توفير إصدار موسّع من مجموعة البث القابلة للقراءة لمعالجة وحدات البايت بكفاءة، لا سيما عن طريق تصغير عدد النُسخ. تتيح مجموعات بيانات وحدات البايت إمكانية اكتساب قرّاء المخزن المؤقت (BYOB) الخاص بك. يمكن أن يعطي التنفيذ الافتراضي نطاقًا من المُخرجات المختلفة مثل السلاسل أو المخازن المؤقتة للصفائف في حالة مآخذ الويب، بينما تضمن ساحات مشاركات البايت الحصول على إخراج بايت. بالإضافة إلى ذلك، تقدّم برامج قراءة BYOB مزايا متعلقة بالاستقرار. وذلك لأنه إذا تم فصل المخزن المؤقت، يمكن أن يضمن عدم إدخال إحداها مرتين على نفس المورد الاحتياطي، وبالتالي تجنب حالات العرق. يمكن لبرامج قراءة BYOB تقليل عدد المرات التي يحتاج فيها المتصفح إلى تشغيل مجموعة البيانات غير المرغوب فيها، لأنه يمكنه إعادة استخدام المخازن المؤقتة.

إنشاء دفق بايت قابل للقراءة

يمكنك إنشاء ساحة مشاركات بايت قابلة للقراءة من خلال تمرير معلَمة type إضافية إلى الدالة الإنشائية ReadableStream().

new ReadableStream({ type: 'bytes' });

فريق underlyingSource

يتم تخصيص رمز ReadableByteStreamController للمصدر الأساسي لبثّ البايت القابل للقراءة. تستخدم الطريقة ReadableByteStreamController.enqueue() وسيطة chunk وقيمتها ArrayBufferView. تعرض الخاصية ReadableByteStreamController.byobRequest طلب السحب الحالي BYOB، أو قيمة فارغة إذا لم يكن هناك أي منها. أخيرًا، تعرض السمة ReadableByteStreamController.desiredSize الحجم المطلوب لملء قائمة الانتظار الداخلية لمجموعة البث الخاضعة للرقابة.

فريق queuingStrategy

الوسيطة الثانية الاختيارية بالمثل، للدالة الإنشائية ReadableStream() هي queuingStrategy. وهو كائن يحدِّد اختياريًا استراتيجية قائمة المحتوى التالي للبث، والذي يأخذ معلَمة واحدة:

  • highWaterMark: عدد وحدات بايت غير سالب يشير إلى ارتفاع درجة حرارة التدفق باستخدام استراتيجية الإضافة إلى "قائمة المحتوى التالي" هذه ويُستخدم هذا المقياس لتحديد الضغط الخلفي، ويظهر هذا الضغط من خلال سمة ReadableByteStreamController.desiredSize المناسبة. ويتم أيضًا تطبيق هذه القاعدة عند استدعاء طريقة pull() للمصدر الأساسي.

طريقتان getReader() وread()

يمكنك بعد ذلك الحصول على إذن بالوصول إلى ReadableStreamBYOBReader من خلال ضبط المَعلمة mode وفقًا لذلك: ReadableStream.getReader({ mode: "byob" }). يسمح هذا بالتحكم بدقة أكبر في تخصيص المورد الاحتياطي لتجنب النسخ. للقراءة من ساحة مشاركات البايت، عليك طلب الرمز ReadableStreamBYOBReader.read(view)، حيث يتم استخدام view ArrayBufferView.

نموذج رمز بث بايت قابل للقراءة

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

تعرض الدالة التالية تدفقات بايت قابلة للقراءة تتيح القراءة الفعّالة بدون نسخة صفرية لصفيف يتم إنشاؤه عشوائيًا. وبدلاً من استخدام حجم مقطع محدد مسبقًا وهو 1024، يحاول ملء المورد الاحتياطي الذي يوفره المطور، مما يسمح بالتحكم الكامل.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

آليات البث القابل للكتابة

ساحة المشاركات القابلة للكتابة هي وجهة يمكنك كتابة البيانات فيها، ويتم تمثيلها في JavaScript بعنصر WritableStream. يكون هذا بمثابة تجريد على الجزء العلوي من الحوض الأساسي، وهو مستودع بيانات إدخال وإخراج عند مستوى أقل تتم كتابة البيانات الأولية فيه.

تتم كتابة البيانات في ساحة المشاركات باستخدام writer، كل مقطع على حدة. يمكن أن يتخذ المقطع العديد من الأشكال، تمامًا مثل الأجزاء الموجودة في القارئ. يمكنك استخدام أي تعليمة برمجية تريدها لإنتاج المقاطع جاهزة للكتابة؛ ويسمى الكاتب والكود المرتبط بها منتج.

عندما يتم إنشاء كاتب ويبدأ في الكتابة في ساحة مشاركات (كاتب نشط)، يُقال إنّه مقفل ولا يمكن الوصول إليه. لا يمكن لأكثر من كاتب واحد الكتابة في ساحة مشاركات قابلة للكتابة في وقت واحد. إذا أردت أن يبدأ كاتب آخر في الكتابة إلى البث المباشر، يجب عادةً إصدار الأغنية قبل إضافة كاتب آخر إليه.

تعمل قائمة الانتظار الداخلية على تتبُّع المقاطع التي تمت كتابتها في مجموعة البث ولم تتم معالجتها بعد في مصدر البيانات الأساسي.

استراتيجية الإضافة إلى "قائمة المحتوى التالي" هي كائن يحدّد كيفية إشارة البث إلى الضغط العكسي بناءً على حالة قائمة الانتظار الداخلية. تحدّد استراتيجية "الإضافة إلى قائمة المحتوى التالي" حجمًا لكل مجموعة، وتقارن الحجم الإجمالي لكل المقاطع في قائمة الانتظار بعدد محدّد، تُعرف باسم علامة المياه العالية.

يُطلق على البنية النهائية وحدة تحكُّم. يتضمّن كل بث قابل للكتابة وحدة تحكّم تتيح لك التحكّم في البث (لإلغائه على سبيل المثال).

إنشاء بث قابل للكتابة

توفّر واجهة WritableStream لـ Streams API تجريدًا عاديًا لكتابة بيانات البثّ إلى وجهة معيّنة، تُعرف باسم الحوض. يتضمّن هذا العنصر ميزة "ضغط الخلفية" المدمَجة والإضافة إلى "قائمة المحتوى التالي". يمكنك إنشاء بث قابل للكتابة من خلال استدعاء دالة الإنشاء WritableStream(). ويحتوي على معلَمة underlyingSink اختيارية تمثل كائنًا بطرق وخصائص تحدّد طريقة عمل مثيل البث الذي تم إنشاؤه.

فريق underlyingSink

يمكن أن يتضمّن underlyingSink الطرق الاختيارية التالية التي يحدّدها المطوِّر. المعلَمة controller التي تم تمريرها إلى بعض الطرق هي WritableStreamDefaultController.

  • start(controller): يتم استدعاء هذه الطريقة فورًا عند إنشاء الكائن. وينبغي أن تهدف محتويات هذه الطريقة إلى الوصول إلى الحوض الأساسي. إذا تم إجراء هذه العملية بشكل غير متزامن، فقد تؤدي إلى تحقيق وعود بالنجاح أو الفشل.
  • write(chunk, controller): سيتم استدعاء هذه الطريقة عندما تكون مجموعة جديدة من البيانات (المحددة في المعلَمة chunk) جاهزة للكتابة في المصدر الأساسي. يمكن أن يقدم وعدًا يشير إلى نجاح أو فشل عملية الكتابة. لا يتم استدعاء هذه الطريقة إلا بعد نجاح عمليات الكتابة السابقة، وليس بعد إغلاق البث أو إلغاءه مطلقًا.
  • close(controller): سيتم استدعاء هذه الطريقة إذا أشار التطبيق إلى أنّه قد انتهى من كتابة المقاطع إلى البث. يجب أن يبذل المحتوى كل ما هو ضروري لإنهاء عمليات الكتابة إلى الحوض الأساسي وإطلاق الوصول إليه. إذا كانت هذه العملية غير متزامنة، فيمكن أن تقدم وعدًا للإشارة إلى النجاح أو الفشل. لا يتم استدعاء هذه الطريقة إلا بعد نجاح جميع عمليات الكتابة في قائمة الانتظار بنجاح.
  • abort(reason): سيتم استدعاء هذه الطريقة إذا أشار التطبيق إلى أنّه يريد إغلاق البث بشكل مفاجئ ووضعه في حالة خطأ. يمكنها إزالة أي موارد متوقفة، مثل close()، ولكن سيتم استدعاء abort() حتى إذا كانت عمليات الكتابة في قائمة الانتظار. سيتم التخلص من هذه الأجزاء. إذا كانت هذه العملية غير متزامنة، فيمكن أن تقدم وعدًا يشير إلى نجاح العملية أو إخفاقها. تحتوي مَعلمة reason على DOMString يصف سبب إيقاف البث.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

تمثّل واجهة WritableStreamDefaultController في Streams API وحدة تحكّم تتيح التحكّم في حالة WritableStream أثناء عملية الإعداد، وذلك لأنّه يتم إرسال المزيد من المقاطع للكتابة أو في نهاية النص. عند إنشاء WritableStream، يتم تخصيص مثيل WritableStreamDefaultController لمعالجته. ثمة طريقة واحدة فقط في WritableStreamDefaultController: WritableStreamDefaultController.error()، ما يؤدي إلى حدوث خطأ في أي تفاعلات مستقبلية مع البث المرتبط. تتوافق WritableStreamDefaultController أيضًا مع السمة signal التي تعرض مثيل AbortSignal، ما يسمح بإيقاف عملية WritableStream إذا لزم الأمر.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

فريق queuingStrategy

الوسيطة الثانية الاختيارية بالمثل، للدالة الإنشائية WritableStream() هي queuingStrategy. وهو كائن يحدد اختياريًا استراتيجية وضع البث في قائمة المحتوى التالي، الأمر الذي يتطلب معلَمتين هما:

  • highWaterMark: رقم غير سالب يشير إلى ارتفاع علامة تبويب المجرى المائي باستخدام استراتيجية الإضافة إلى "قائمة المحتوى التالي" هذه.
  • size(chunk): دالة تحسب وتعرض الحجم غير السالب المحدود لقيمة المجموعة المحدّدة. يتم استخدام النتيجة لتحديد مقدار الضغط الخلفي، ويظهر هذا الضغط من خلال سمة WritableStreamDefaultWriter.desiredSize المناسبة.

طريقتان getWriter() وwrite()

للكتابة في ساحة مشاركات قابلة للكتابة، تحتاج إلى كاتب سيكون WritableStreamDefaultWriter. تعرض الطريقة getWriter() في واجهة WritableStream مثيلاً جديدًا من WritableStreamDefaultWriter وتقفل البث على ذلك المثيل. وأثناء قفل البث، لا يمكن اكتساب أي كاتب آخر إلا بعد إطلاق البث الحالي.

تكتب الطريقة write() في واجهة WritableStreamDefaultWriter جزءًا تم تمريره من البيانات إلى WritableStream ومصدره الأساسي، ثم تعرض الوعدًا الذي يشير إلى نجاح أو فشل عملية الكتابة. تجدر الإشارة إلى أن معنى كلمة "النجاح" يعود إلى العامل الأساسي، حيث قد يشير إلى قبول المقطع، وليس بالضرورة إلى حفظه بأمان في وجهته النهائية.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

السمة locked

يمكنك التأكد ممّا إذا كانت مجموعة البث القابلة للكتابة مقفلة من خلال الانتقال إلى خاصية WritableStream.locked الخاصة به.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نموذج رمز البث القابل للكتابة

يعرض نموذج الرمز أدناه جميع الخطوات قيد التنفيذ.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

توجيه البث القابل للقراءة في البث القابل للكتابة

يمكن توجيه ساحة مشاركات قابلة للقراءة إلى بث قابل للكتابة من خلال طريقة pipeTo() التي يمكن قراءتها في ساحة المشاركات. توجّه ReadableStream.pipeTo() السمة ReadableStream الحالية إلى WritableStream محدّدة وتعرض وعًا يتم الوفاء به عند اكتمال عملية تضمين البيانات بنجاح، أو يتم رفضها في حال حدوث أي أخطاء.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

إنشاء بث تحويل

تمثّل واجهة TransformStream لـ Streams API مجموعة من البيانات القابلة للتحويل. يمكنك إنشاء تدفق تحويل عن طريق استدعاء الدالة الإنشائية TransformStream()، التي تنشئ كائن بث تحويل وتعرضه من المعالِجات المحددة. تقبل الدالة الإنشائية TransformStream() كوسيطة أولى كائن JavaScript اختياري يمثّل transformer. ويمكن أن تحتوي هذه الكائنات على أي من الطرق التالية:

فريق transformer

  • start(controller): يتم استدعاء هذه الطريقة فورًا عند إنشاء الكائن. يُستخدم عادةً لإدراج أجزاء البادئات في قائمة الانتظار، باستخدام controller.enqueue(). ستتم قراءة هذه المقاطع من الجانب القابل القراءة ولكن لا تعتمد على أي عمليات كتابة إلى الجانب القابل للكتابة. إذا كانت هذه العملية الأولية غير متزامنة، على سبيل المثال، لأنّها تستغرق بعض الجهد للحصول على مقاطع البادئة، يمكن أن تعرض الدالة تعهدًا بتقديم إشارة بالنجاح أو الفشل، وسيؤدي الوعد المرفوض إلى خطأ في البث. ستُعيد الدالة الإنشائية TransformStream() طرح أي استثناءات يتم طرحها.
  • transform(chunk, controller): يتم استدعاء هذه الطريقة عندما يكون مقطع جديد مكتوب في الأصل على الجانب القابل للكتابة جاهزًا للتحويل. تضمن عملية تنفيذ مصدر البيانات عدم استدعاء هذه الدالة إلا بعد نجاح عمليات التحويل السابقة، وليس قبل اكتمال start() أو بعد استدعاء flush(). تؤدي هذه الدالة عملية التحويل الفعلية لتدفق التحويل. يمكنها إدراج النتائج في قائمة الانتظار باستخدام controller.enqueue(). ويتيح ذلك لجزء واحد مكتوب على الجانب القابل للكتابة ألّا يظهر على الجانب القابل للقراءة أو عدة أجزاء على الجانب القابل للقراءة، بناءً على عدد مرات استدعاء controller.enqueue(). وإذا كانت عملية التحويل غير متزامنة، فيمكن أن تقدم هذه الدالة وعودًا بالإشارة إلى نجاح التحويل أو فشله. سيؤدي الوعد المرفوض إلى حدوث خطأ في جانبَي تدفق التحويل القابل للقراءة والكتابة. إذا لم يتم توفير طريقة transform()، يتم استخدام تحويل الهوية، ما يدرِج المقاطع التي لم يتم تغييرها من الجانب القابل للكتابة إلى الجانب القابل للقراءة.
  • flush(controller): يتم استدعاء هذه الطريقة بعد تحويل جميع المقاطع المكتوبة على الجانب القابل للكتابة من خلال المرور عبر transform() بنجاح، ويوشك الجزء القابل للكتابة على الإغلاق. عادةً ما يستخدم هذا لإدراج مقاطع اللاحقة في قائمة الانتظار للجانب القابل للقراءة، قبل أن يصبح مغلقًا أيضًا. إذا كانت عملية المسح غير متزامنة، يمكن أن تعرض الدالة وعدًا بإشارة إلى النجاح أو الإخفاق، وسيتم إبلاغ النتيجة إلى المتصل stream.writable.write(). بالإضافة إلى ذلك، سيؤدي الوعد المرفوض إلى حدوث خطأ في جانبَي البث القابل للقراءة والكتابة. يتم التعامل مع طرح استثناء تمامًا مثل إرجاع واعد مرفوض.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

استراتيجيتا وضع "writableStrategy" و"readableStrategy" في قائمة المحتوى التالي

المعلمتان الاختياريتان الثانية والثالثة في الدالة الإنشائية TransformStream() هما استراتيجيتان اختياريتان writableStrategy وreadableStrategy للإضافة إلى قائمة الانتظار. ويتم تعريفها كما هو موضّح في قسمَي البث قابل للقراءة وقابل للكتابة على التوالي.

تحويل نموذج رمز البث

يوضح الرمز البرمجي التالي تدفق تحويل بسيط قيد التنفيذ.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

إرسال بث مباشر يمكن قراءته من خلال تدفق تحويل

إنّ طريقة pipeThrough() في واجهة ReadableStream توفر طريقة تسلسلية لنقل البث الحالي من خلال بث تحويل أو أي زوج آخر قابل للكتابة أو القراءة. بشكل عام، يؤدي النقر على تدفق إلى قفله طوال مدة الممر، مما يمنع القراء الآخرين من قفله.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

يوضح نموذج الرمز التالي (مع بعض الشيء) كيف يمكنك تنفيذ إصدار "صاخب" من fetch() يكتب حروفًا كبيرة في كل النص من خلال استخدام وعد الاستجابة المعروضة كبث، وعدد أحرف كبيرة لكل مقطع. تتمثل ميزة هذا النهج في أنك لا تحتاج إلى انتظار تنزيل المستند بأكمله، مما قد يحدث فرقًا كبيرًا عند التعامل مع الملفات الكبيرة.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

الخصائص الديموغرافية

يعرض العرض التوضيحي أدناه مجموعات بث قابلة للقراءة وقابلة للكتابة وتحويلها أثناء العمل. تتضمّن أيضًا أمثلة على سلاسل الممرات "pipeThrough()" و"pipeTo()"، وتوضّح أيضًا السمة tee(). يمكنك اختياريًا تشغيل العرض التوضيحي في نافذته الخاصة أو عرض رمز المصدر.

أحداث بث مفيدة متوفرة في المتصفّح

هناك عدد من مجموعات البث المفيدة المضمّنة في المتصفح مباشرةً. يمكنك بسهولة إنشاء ReadableStream من كائن ثنائي كبير (blob). تعرض طريقة stream() للواجهة Blob عنصر ReadableStream الذي يؤدي عند القراءة إلى عرض البيانات المضمّنة في الكائن blob. وتذكَّر أيضًا أنّ كائن File هو نوع معيّن من Blob، ويمكن استخدامه في أي سياق يمكن استخدام كائن ثنائي الأبعاد.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

يُطلَق على صيغتَي البث TextDecoder.decode() وTextEncoder.encode() اسم TextDecoderStream وTextEncoderStream على التوالي.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

يمكن ضغط الملف أو فك ضغطه بسهولة من خلال عمليتَي تحويل البيانات CompressionStream وDecompressionStream على التوالي. يوضح نموذج الرمز البرمجي أدناه كيفية تنزيل مواصفات Streams، وضغطها (gzip) مباشرةً في المتصفح، وكتابة الملف المضغوط على القرص مباشرةً.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

إنّ ملفات File System Access API FileSystemWritableFileStream وواجهات برمجة التطبيقات fetch() التجريبية هي أمثلة على مجموعات البث القابلة للكتابة بشكل عام.

تستخدم Serial API بشكلٍ كبير مجموعات البث القابلة للقراءة والكتابة.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

أخيرًا، تدمج واجهة برمجة تطبيقات WebSocketStream مجموعات البث مع WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

مراجع مفيدة

شكر وتقدير

تمت مراجعة هذه المقالة من قِبل جيك أرشيبالد وفرانسوا بوفورت وسام دوتون وماتياس بولينز وسورما وجو ميدلي وآدم رايس. ساعدتني مشاركات مدونة جيك أرشيبالد كثيرًا في فهم ساحات المشاركات. بعض عيّنات التعليمات البرمجية مستوحاة من استكشافات مستخدم GitHub @bellbind وأجزاء أخرى من هذا النص تستند بشكل كبير إلى مستندات MDN على الويب في ساحة المشاركات. بذل مؤلفو Streams Standard عملاً ضخمًا في كتابة هذه المواصفات. صورة رئيسية من إعداد ريان لارا في Unلمحة