أحداث البث: الدليل النهائي

تعرَّف على كيفية استخدام عمليات القراءة والكتابة والتحويل في البيانات المتدفقة باستخدام Streams API.

تتيح لك واجهة Streams API الوصول آليًا إلى تدفقات البيانات التي يتم تلقّيها عبر الشبكة أو التي يتم إنشاؤها بأي وسيلة محلية ومعالجتها باستخدام JavaScript. يتضمّن البث تقسيم المورد الذي تريد تلقّيه أو إرساله أو تحويله إلى أجزاء صغيرة، ثم معالجة هذه الأجزاء جزءًا تلو الآخر. مع أنّ المتصفحات تتيح تشغيل المحتوى أثناء تنزيله عند تلقّي مواد عرض، مثل HTML أو الفيديوهات التي سيتم عرضها على صفحات الويب، لم تكن هذه الإمكانية متاحة من قبل للغة JavaScript إلى أن تم طرح fetch مع عمليات البث في عام 2015.

في السابق، إذا أردت معالجة نوع معيّن من الموارد (سواء كان فيديو أو ملفًا نصيًا أو غير ذلك)، كان عليك تنزيل الملف بأكمله، والانتظار إلى أن تتم إزالة تسلسله وتحويله إلى تنسيق مناسب، ثم معالجته. ومع توفّر عمليات البث في JavaScript، يتغيّر كل ذلك. يمكنك الآن معالجة البيانات الأولية باستخدام JavaScript بشكل تدريجي فور توفّرها على العميل، بدون الحاجة إلى إنشاء مخزن مؤقت أو سلسلة أو كائن ثنائي كبير. يؤدي ذلك إلى إتاحة عدد من حالات الاستخدام، بعضها مدرَج أدناه:

  • تأثيرات الفيديو: توجيه بث فيديو قابل للقراءة من خلال بث تحويل يطبّق التأثيرات في الوقت الفعلي
  • ضغط البيانات وفك ضغطها: نقل دفق ملفات من خلال دفق تحويل يضغط الملفات أو يفك ضغطها بشكل انتقائي.
  • فك ترميز الصور: توجيه بث استجابة HTTP من خلال بث تحويل يفك ترميز وحدات البايت إلى بيانات نقطية، ثم من خلال بث تحويل آخر يترجم الخرائط النقطية إلى ملفات PNG. إذا تم تثبيت هذه السمة داخل معالج fetch الخاص بأحد عاملي الخدمة، سيسمح لك ذلك بتعبئة تنسيقات الصور الجديدة، مثل AVIF، بشكل غير ملحوظ.

دعم المتصفح

ReadableStream وWritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

المفاهيم الأساسية

قبل أن أتطرّق إلى تفاصيل الأنواع المختلفة من أحداث البث، سأقدّم بعض المفاهيم الأساسية.

قِطَع

القطعة هي جزء واحد من البيانات يتم كتابته في مصدر بيانات أو قراءته منه. ويمكن أن يكون من أي نوع، بل يمكن أن تحتوي عمليات البث على أجزاء من أنواع مختلفة. في معظم الأحيان، لن تكون الأجزاء أصغر وحدة بيانات في بث معيّن. على سبيل المثال، قد يحتوي دفق البايت على أجزاء تتألف من 16 وحدة Uint8Array كيلوبايت، بدلاً من بايت واحد.

مصادر البيانات القابلة للقراءة

يمثّل مصدر البيانات القابل للقراءة مصدر بيانات يمكنك القراءة منه. بعبارة أخرى، تخرج البيانات من مصدر بيانات قابل للقراءة. وبشكل ملموس، فإنّ دفق البيانات القابل للقراءة هو مثيل للفئة ReadableStream

مصادر البيانات القابلة للكتابة

يمثّل مصدر البيانات القابل للكتابة وجهة للبيانات يمكنك الكتابة فيها. بعبارة أخرى، يتم إدخال البيانات إلى مصدر بيانات قابل للكتابة. على وجه التحديد، الدفق القابل للكتابة هو مثيل للفئة WritableStream.

تحويل أحداث البث

يتألف مصدر بيانات التحويل من زوج من مصادر البيانات: مصدر بيانات قابل للكتابة، يُعرف باسم الجانب القابل للكتابة، ومصدر بيانات قابل للقراءة، يُعرف باسم الجانب القابل للقراءة. ويمكن تشبيه ذلك في العالم الحقيقي بالمترجم الفوري الذي يترجم من لغة إلى أخرى أثناء المحادثة. بطريقة خاصة بدفق التحويل، تؤدي الكتابة إلى الجانب القابل للكتابة إلى إتاحة بيانات جديدة للقراءة من الجانب القابل للقراءة. على وجه التحديد، يمكن لأي عنصر يتضمّن السمتَين writable وreadable أن يعمل كدفق تحويل. ومع ذلك، يسهّل الصف TransformStream إنشاء زوج من المفاتيح المرتبطة بشكل صحيح.

سلاسل الأنابيب

تُستخدَم عمليات البث بشكل أساسي من خلال توجيهها إلى بعضها البعض. يمكن توجيه بيانات مصدر قابلة للقراءة مباشرةً إلى بيانات وجهة قابلة للكتابة باستخدام الطريقة pipeTo() الخاصة ببيانات المصدر القابلة للقراءة، أو يمكن توجيهها من خلال بيانات تحويل واحدة أو أكثر أولاً باستخدام الطريقة pipeThrough() الخاصة ببيانات المصدر القابلة للقراءة. يُشار إلى مجموعة من عمليات البث التي يتم ربطها معًا بهذه الطريقة باسم سلسلة ربط.

الضغط الخلفي

بعد إنشاء سلسلة أنابيب، سيتم نشر إشارات بشأن سرعة تدفّق الأجزاء خلالها. إذا تعذّر على أي خطوة في السلسلة قبول أجزاء، فإنّها تنقل إشارة إلى الخلف عبر سلسلة الأنابيب، إلى أن يتم إخبار المصدر الأصلي في النهاية بالتوقّف عن إنتاج الأجزاء بهذه السرعة. تُعرف عملية تسوية معدّل نقل البيانات هذه باسم "الضغط الخلفي".

Teeing

يمكن إنشاء نسخة من مصدر بيانات قابل للقراءة (سمّي بهذا الاسم نسبةً إلى شكل حرف "T" كبير) باستخدام الطريقة tee(). سيؤدي ذلك إلى قفل البث، أي عدم إمكانية استخدامه مباشرةً، ولكن سيتم إنشاء بثَّين جديدَين، يُطلق عليهما اسم "فروع"، ويمكن استخدامهما بشكل مستقل. تُعدّ عملية "التحضير" مهمة أيضًا لأنّه لا يمكن ترجيع أو إعادة تشغيل البث، وسنتحدّث عن ذلك لاحقًا.

مخطّط لسلسلة أنابيب تتألف من مصدر بيانات قابل للقراءة وارد من طلب إلى واجهة برمجة التطبيقات Fetch، ثم يتم تمريره عبر مصدر بيانات تحويل يتم تكراره ثم إرساله إلى المتصفّح للحصول على مصدر البيانات الأول القابل للقراءة وإلى ذاكرة التخزين المؤقت لبرنامج الخدمة للحصول على مصدر البيانات الثاني القابل للقراءة.
سلسلة أنابيب

آلية عمل بث قابل للقراءة

مصدر البيانات القابل للقراءة هو مصدر بيانات يتم تمثيله في JavaScript بواسطة عنصر ReadableStream يتدفق من مصدر أساسي. تنشئ الدالة الإنشائية ReadableStream() كائن دفق قابل للقراءة وتعرضه من المعالِجات المحدّدة. هناك نوعان من المستند المصدر:

  • مصادر البيانات التي يتم إرسالها: ترسل هذه المصادر البيانات إليك باستمرار عند الوصول إليها، ويعود إليك قرار بدء الوصول إلى بث البيانات أو إيقافه مؤقتًا أو إلغائه. وتشمل الأمثلة بث الفيديو المباشر، والأحداث التي يرسلها الخادم، أو WebSockets.
  • تتطلّب مصادر السحب أن تطلب البيانات منها بشكل صريح بعد الربط بها. وتشمل الأمثلة عمليات HTTP من خلال طلبات fetch() أو XMLHttpRequest.

تتم قراءة بيانات البث بالتسلسل في أجزاء صغيرة تُعرف باسم القطع. يُقال إنّ الأجزاء الموضوعة في بث مُدرَجة في قائمة الانتظار. وهذا يعني أنّها تنتظر في قائمة لتكون جاهزة للقراءة. يتتبّع قائمة انتظار داخلية الأجزاء التي لم تتم قراءتها بعد.

استراتيجية وضع في قائمة الانتظار هي عنصر يحدّد كيفية إرسال إشارة الضغط الخلفي إلى البث استنادًا إلى حالة قائمة الانتظار الداخلية. تحدّد استراتيجية وضع المحتوى في قائمة الانتظار حجم كل جزء، وتقارن الحجم الإجمالي لجميع الأجزاء في قائمة الانتظار برقم محدّد يُعرف باسم الحدّ الأقصى.

يقرأ قارئ الأجزاء داخل البث. يستردّ هذا القارئ البيانات جزءًا واحدًا في كل مرة، ما يتيح لك إجراء أي نوع من العمليات التي تريدها. يُطلق على القارئ ورمز المعالجة الآخر المرتبط به اسم مستهلك.

يُطلق على العنصر التالي في هذا السياق اسم وحدة التحكّم. يحتوي كل مصدر بيانات قابل للقراءة على وحدة تحكّم مرتبطة به، وكما يشير الاسم، تتيح لك هذه الوحدة التحكّم في مصدر البيانات.

يمكن لقارئ واحد فقط قراءة بث واحد في كل مرة، وعند إنشاء قارئ وبدء قراءة بث (أي أن يصبح قارئًا نشطًا)، يتم قفل هذا القارئ على هذا البث. إذا أردت أن يتولّى قارئ آخر قراءة البث، عليك عادةً إيقاف القارئ الأول قبل إجراء أي شيء آخر (مع أنّه يمكنك تكرار عمليات البث).

إنشاء مصدر بيانات قابل للقراءة

يمكنك إنشاء مصدر بيانات قابل للقراءة من خلال استدعاء الدالة الإنشائية الخاصة به ReadableStream(). يحتوي الدالة الإنشائية على وسيط اختياري underlyingSource، وهو يمثّل كائنًا يتضمّن طرقًا وخصائص تحدّد سلوك مثيل البث الذي تم إنشاؤه.

فريق underlyingSource

يمكن أن تستخدم هذه السمة الطرق الاختيارية التالية التي يحدّدها المطوّر:

  • start(controller): يتم استدعاؤه فور إنشاء العنصر. يمكن للطريقة الوصول إلى مصدر البث وتنفيذ أي إجراءات أخرى مطلوبة لإعداد وظيفة البث. إذا كان من المفترض أن تتم هذه العملية بشكل غير متزامن، يمكن أن تعرض الطريقة وعدًا للإشارة إلى النجاح أو الفشل. المَعلمة controller التي تم تمريرها إلى هذه الطريقة هي ReadableStreamDefaultController.
  • pull(controller): يمكن استخدامها للتحكّم في البث أثناء جلب المزيد من الأجزاء. يتم استدعاؤها بشكل متكرر ما دام قائمة انتظار الأجزاء الداخلية في البث غير ممتلئة، إلى أن تصل قائمة الانتظار إلى الحد الأقصى. إذا كانت نتيجة استدعاء pull() هي وعد، لن يتم استدعاء pull() مرة أخرى إلى أن يتم تنفيذ هذا الوعد. إذا تم رفض الوعد، سيحدث خطأ في البث.
  • cancel(reason): يتم استدعاؤها عندما يلغي مستهلك البث البث.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

تتيح ReadableStreamDefaultController الطرق التالية:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء ReadableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في قائمة الانتظار للبث، ويأخذ مَعلمتَين:

  • highWaterMark: رقم غير سالب يشير إلى الحدّ الأقصى لعدد العناصر في البث باستخدام استراتيجية وضع العناصر في قائمة الانتظار هذه.
  • size(chunk): دالة تحتسب وتعرض الحجم المحدود غير السالب لقيمة الجزء المحدّد. يتم استخدام النتيجة لتحديد الضغط الخلفي، والذي يظهر من خلال السمة ReadableStreamDefaultController.desiredSize المناسبة. ويتحكّم أيضًا في وقت استدعاء طريقة pull() للمصدر الأساسي.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

الطريقتان getReader() وread()

للقراءة من مصدر بيانات قابل للقراءة، تحتاج إلى قارئ، وهو عبارة عن ReadableStreamDefaultReader. تنشئ الطريقة getReader() للواجهة ReadableStream قارئًا وتمنع الوصول إلى البث إلا من خلاله. أثناء قفل القارئ، لا يمكن الحصول على قارئ آخر إلى أن يتم إطلاق هذا القارئ.

تعرض طريقة read() الخاصة بواجهة ReadableStreamDefaultReader وعدًا يتيح الوصول إلى الجزء التالي في قائمة الانتظار الداخلية الخاصة بالدفق. يتم تنفيذها أو رفضها مع عرض نتيجة استنادًا إلى حالة البث. في ما يلي الاحتمالات المختلفة:

  • إذا كانت هناك أجزاء متاحة، سيتم تنفيذ الوعد باستخدام عنصر من النموذج
    { value: chunk, done: false }.
  • إذا تم إغلاق البث، سيتم تنفيذ الوعد باستخدام عنصر من النموذج
    { value: undefined, done: true }.
  • إذا حدث خطأ في البث، سيتم رفض الوعد مع الخطأ ذي الصلة.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

السمة locked

يمكنك التحقّق مما إذا كان قد تم قفل بث قابل للقراءة من خلال الوصول إلى السمة ReadableStream.locked الخاصة به.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

عيّنات التعليمات البرمجية لتدفّق البيانات القابل للقراءة

يعرض نموذج الرمز البرمجي أدناه جميع الخطوات أثناء التنفيذ. عليك أولاً إنشاء ReadableStream الذي يحدّد طريقة start() في وسيطته underlyingSource (أي الفئة TimestampSource). تطلب هذه الطريقة من controller في البث enqueue() طابعًا زمنيًا كل ثانية لمدة عشر ثوانٍ. أخيرًا، يطلب من وحدة التحكّم close() البث. يمكنك استخدام هذا البث من خلال إنشاء قارئ باستخدام الطريقة getReader() واستدعاء read() إلى أن يصبح البث done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

التكرار غير المتزامن

قد لا تكون واجهة برمجة التطبيقات الأكثر ملاءمة هي التحقّق من حالة البث done عند كل تكرار لحلقة read(). لحسن الحظ، ستتوفّر قريبًا طريقة أفضل لإجراء ذلك، وهي التكرار غير المتزامن.

for await (const chunk of stream) {
  console.log(chunk);
}

يمكنك استخدام حلّ بديل لتنفيذ التكرار غير المتزامن اليوم من خلال تنفيذ السلوك باستخدام polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

توزيع بيانات من مصدر قابل للقراءة

تُنشئ طريقة tee() التابعة للواجهة ReadableStream نسخة من دفق القراءة الحالي، وتعرض مصفوفة تتضمّن عنصرين يحتويان على الفرعين الناتجين كنسختين جديدتين من ReadableStream. يتيح ذلك لشخصَين قراءة بث في الوقت نفسه. يمكنك إجراء ذلك، على سبيل المثال، في عامل الخدمة إذا أردت جلب ردّ من الخادم ونقله إلى المتصفّح، ولكن أيضًا نقله إلى ذاكرة التخزين المؤقت لعامل الخدمة. بما أنّه لا يمكن استخدام نص الرد أكثر من مرة، يجب أن يكون لديك نسختان لتنفيذ ذلك. لإلغاء البث، عليك بعد ذلك إلغاء كلتا الفرعَين الناتجَين. سيؤدي ربط بث بشكل عام إلى قفله طوال مدته، ما يمنع القرّاء الآخرين من قفله.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

مصادر بيانات البايت القابلة للقراءة

بالنسبة إلى مصادر البيانات التي تمثّل وحدات بايت، يتم توفير إصدار موسّع من مصدر البيانات القابل للقراءة للتعامل مع وحدات البايت بكفاءة، لا سيما من خلال تقليل النسخ. تتيح حِزم البيانات الحصول على برامج قراءة خارجية (BYOB). يمكن أن يقدّم التنفيذ التلقائي مجموعة من المخرجات المختلفة، مثل السلاسل أو مخازن المصفوفات في حالة WebSockets، بينما تضمن تدفقات البايتات إخراج البايتات. بالإضافة إلى ذلك، يستفيد القرّاء الذين يستخدمون ميزة "استخدام شبكة الجوّال الخاصة بك" من مزايا الاستقرار. ويرجع ذلك إلى أنّه في حال فصل المخزن المؤقت، يمكن ضمان عدم الكتابة في المخزن المؤقت نفسه مرتين، وبالتالي تجنُّب حالات التزامن. يمكن لقارئات BYOB تقليل عدد المرات التي يحتاج فيها المتصفّح إلى تنفيذ عملية جمع البيانات غير الضرورية، لأنّه يمكنه إعادة استخدام المخازن المؤقتة.

إنشاء مصدر بيانات بايت قابل للقراءة

يمكنك إنشاء دفق بايت قابل للقراءة عن طريق تمرير مَعلمة type إضافية إلى الدالة الإنشائية ReadableStream().

new ReadableStream({ type: 'bytes' });

فريق underlyingSource

يتم منح ReadableByteStreamController للتعامل مع المصدر الأساسي لتدفق وحدات البايت القابلة للقراءة. يأخذ الإجراء ReadableByteStreamController.enqueue() وسيطة chunk تكون قيمتها ArrayBufferView. تعرض السمة ReadableByteStreamController.byobRequest طلب السحب الحالي لـ BYOB، أو القيمة null إذا لم يكن هناك طلب سحب. أخيرًا، تعرض السمة ReadableByteStreamController.desiredSize الحجم المطلوب لملء قائمة الانتظار الداخلية للبث المتحكَّم فيه.

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء ReadableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في قائمة الانتظار للبث، ويأخذ مَعلمة واحدة:

  • highWaterMark: عدد غير سالب من وحدات البايت يشير إلى الحدّ الأعلى لمعدّل نقل البيانات في البث باستخدام استراتيجية وضع العناصر في قائمة الانتظار هذه. يُستخدَم هذا الإجراء لتحديد الضغط الخلفي، ويظهر من خلال السمة ReadableByteStreamController.desiredSize المناسبة. ويتحكّم أيضًا في وقت استدعاء طريقة pull() للمصدر الأساسي.

الطريقتان getReader() وread()

يمكنك بعد ذلك الوصول إلى ReadableStreamBYOBReader من خلال ضبط المَعلمة mode على النحو التالي: ReadableStream.getReader({ mode: "byob" }). ويتيح ذلك التحكّم بشكلٍ أكثر دقة في عملية تخصيص المخزن المؤقت لتجنُّب النسخ. للقراءة من دفق البايت، عليك استدعاء ReadableStreamBYOBReader.read(view)، حيث view هو ArrayBufferView.

عيّنة التعليمات البرمجية لتدفّق البايت القابل للقراءة

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

تعرض الدالة التالية تدفقات بايت قابلة للقراءة تتيح قراءة فعّالة بدون نسخ لمصفوفة تم إنشاؤها بشكل عشوائي. بدلاً من استخدام حجم مقسّم محدّد مسبقًا يبلغ 1,024، تحاول هذه الطريقة ملء المخزن المؤقت الذي يوفّره المطوّر، ما يتيح التحكّم الكامل.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

آليات ساحة المشاركات القابلة للكتابة

مصدر البيانات القابل للكتابة هو وجهة يمكنك كتابة البيانات فيها، ويتم تمثيله في JavaScript بواسطة الكائن WritableStream. ويعمل هذا الإجراء كطبقة تجريد فوق مصدر أساسي، وهو مصدر إدخال/إخراج منخفض المستوى يتم فيه كتابة البيانات الأولية.

يتمّ تسجيل البيانات في مصدر البيانات من خلال برنامج تسجيل، جزء واحد في كلّ مرّة. يمكن أن تتخذ الأجزاء أشكالاً عديدة، تمامًا مثل الأجزاء في قارئ. يمكنك استخدام أي رمز تريده لإنتاج الأجزاء الجاهزة للكتابة، ويُطلق على الكاتب والرمز المرتبط به اسم منتج.

عند إنشاء كاتب وبدء الكتابة إلى مصدر (كاتب نشط)، يُقال إنّه مقفل عليه. يمكن لكاتب واحد فقط الكتابة إلى بث قابل للكتابة في وقت واحد. إذا أردت أن يبدأ كاتب آخر الكتابة إلى البث، عليك عادةً إصداره، ثم ربط كاتب آخر به.

يتتبّع قائمة انتظار داخلية الأجزاء التي تمت كتابتها في البث ولكن لم تتم معالجتها بعد من خلال المصدر الأساسي.

استراتيجية وضع في قائمة الانتظار هي عنصر يحدّد كيفية إرسال إشارة الضغط الخلفي إلى البث استنادًا إلى حالة قائمة الانتظار الداخلية. تحدّد استراتيجية وضع المحتوى في قائمة الانتظار حجم كل جزء، وتقارن الحجم الإجمالي لجميع الأجزاء في قائمة الانتظار برقم محدّد يُعرف باسم الحدّ الأقصى.

يُطلق على البنية النهائية اسم وحدة التحكّم. يحتوي كل بث قابل للكتابة على وحدة تحكّم مرتبطة تتيح لك التحكّم في البث (على سبيل المثال، لإيقافه).

إنشاء مصدر بيانات قابل للكتابة

توفّر واجهة WritableStream الخاصة بواجهة Streams API تجريدًا عاديًا لكتابة بيانات البث إلى وجهة، تُعرف باسم مصدر البيانات. يتضمّن هذا العنصر ميزة "الضغط الخلفي" المضمّنة وميزة وضع العناصر في قائمة الانتظار. يمكنك إنشاء بث قابل للكتابة من خلال استدعاء الدالة الإنشائية WritableStream(). تحتوي هذه الدالة على المَعلمة الاختيارية underlyingSink التي تمثّل عنصرًا يتضمّن طرقًا وخصائص تحدّد سلوك مثيل مصدر البيانات الذي تم إنشاؤه.

فريق underlyingSink

يمكن أن يتضمّن underlyingSink الطرق الاختيارية التالية التي يحدّدها المطوّر. المَعلمة controller التي يتم تمريرها إلى بعض الطرق هي WritableStreamDefaultController.

  • start(controller): يتم استدعاء هذا الإجراء فور إنشاء العنصر. يجب أن يهدف محتوى هذه الطريقة إلى الوصول إلى مصدر البيانات الأساسي. إذا كان من المفترض أن تتم هذه العملية بشكل غير متزامن، يمكنها عرض وعد للإشارة إلى النجاح أو الفشل.
  • write(chunk, controller): سيتم استدعاء هذه الطريقة عندما تكون مجموعة جديدة من البيانات (المحدّدة في المَعلمة chunk) جاهزة للكتابة إلى مصدر البيانات الأساسي. يمكن أن تعرض هذه الدالة وعدًا بالإشارة إلى نجاح عملية الكتابة أو تعذّرها. لن يتم استدعاء هذه الطريقة إلا بعد نجاح عمليات الكتابة السابقة، ولن يتم استدعاؤها أبدًا بعد إغلاق أو إلغاء البث.
  • close(controller): سيتم استدعاء هذه الطريقة إذا أشار التطبيق إلى أنّه انتهى من كتابة أجزاء في البث. يجب أن تتضمّن المحتويات كل ما يلزم لإنهاء عمليات الكتابة في مصدر البيانات الأساسي وإتاحة الوصول إليه. إذا كانت هذه العملية غير متزامنة، يمكنها عرض وعد للإشارة إلى النجاح أو الإخفاق. لن يتم استدعاء هذه الطريقة إلا بعد نجاح جميع عمليات الكتابة التي تم وضعها في قائمة الانتظار.
  • abort(reason): سيتم استدعاء هذه الطريقة إذا أشار التطبيق إلى أنّه يريد إغلاق البث بشكل مفاجئ ووضعه في حالة خطأ. يمكنه تنظيف أي موارد محتجزة، تمامًا مثل close()، ولكن سيتم استدعاء abort() حتى إذا تم وضع عمليات الكتابة في قائمة الانتظار. وسيتم تجاهل هذه الأجزاء. إذا كانت هذه العملية غير متزامنة، يمكنها عرض وعد للإشارة إلى النجاح أو الفشل. تحتوي المَعلمة reason على DOMString يصف سبب إيقاف البث.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

تمثّل واجهة WritableStreamDefaultController في Streams API أداة تحكّم تتيح التحكّم في حالة WritableStream أثناء عملية الإعداد، أو عند إرسال المزيد من الأجزاء للكتابة، أو في نهاية عملية الكتابة. عند إنشاء WritableStream، يتم منح المصدر الأساسي مثيلاً مطابقًا من WritableStreamDefaultController للتعديل. يحتوي WritableStreamDefaultController على طريقة واحدة فقط: WritableStreamDefaultController.error()، ما يؤدي إلى حدوث خطأ في أي تفاعلات مستقبلية مع البث المرتبط. تتيح السمة WritableStreamDefaultController أيضًا استخدام السمة signal التي تعرض مثيلاً من AbortSignal، ما يتيح إيقاف العملية WritableStream إذا لزم الأمر.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لدالة الإنشاء WritableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في قائمة الانتظار للبث، ويأخذ مَعلمتَين:

  • highWaterMark: رقم غير سالب يشير إلى الحدّ الأقصى لعدد العناصر في البث باستخدام استراتيجية وضع العناصر في قائمة الانتظار هذه.
  • size(chunk): دالة تحتسب وتعرض الحجم المحدود غير السالب لقيمة الجزء المحدّد. يتم استخدام النتيجة لتحديد الضغط الخلفي، والذي يظهر من خلال السمة WritableStreamDefaultWriter.desiredSize المناسبة.

الطريقتان getWriter() وwrite()

لكتابة بيانات في دفق قابل للكتابة، تحتاج إلى كاتب، وهو عبارة عن WritableStreamDefaultWriter. تعرض الطريقة getWriter() للواجهة WritableStream مثيلاً جديدًا من WritableStreamDefaultWriter وتمنع الوصول إلى البث إلا من خلال هذا المثال. أثناء قفل البث، لا يمكن الحصول على أي كاتب آخر إلى أن يتم تحرير الكاتب الحالي.

تكتب طريقة write() في واجهة WritableStreamDefaultWriter جزءًا من البيانات تم تمريره إلى WritableStream والمصدر الأساسي، ثم تعرض وعدًا يتم تنفيذه للإشارة إلى نجاح عملية الكتابة أو فشلها. يُرجى العِلم أنّ معنى "النجاح" يعتمد على المصدر الأساسي، وقد يشير إلى أنّه تم قبول الجزء، وليس بالضرورة أنّه تم حفظه بأمان في وجهته النهائية.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

السمة locked

يمكنك التحقّق مما إذا كان قد تم قفل بث قابل للكتابة من خلال الوصول إلى السمة WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

عيّنة التعليمات البرمجية لتدفق البيانات القابل للكتابة

يعرض نموذج الرمز البرمجي أدناه جميع الخطوات أثناء التنفيذ.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

توجيه بث قابل للقراءة إلى بث قابل للكتابة

يمكن نقل بيانات مصدر بيانات قابل للقراءة إلى مصدر بيانات قابل للكتابة باستخدام طريقة pipeTo() الخاصة بمصدر البيانات القابل للقراءة. تنقل الدالة ReadableStream.pipeTo() قيمة ReadableStream الحالية إلى WritableStream المحدّدة وتعرض وعدًا يتم تنفيذه عند اكتمال عملية النقل بنجاح، أو يتم رفضه في حال حدوث أي أخطاء.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

إنشاء ساحة مشاركات

تمثّل واجهة TransformStream في Streams API مجموعة من البيانات القابلة للتحويل. يمكنك إنشاء دفق تحويل من خلال استدعاء الدالة الإنشائية TransformStream()، ما يؤدي إلى إنشاء كائن دفق تحويل وعرضه من المعالجات المحدّدة. تقبل دالة الإنشاء TransformStream() كأول وسيط لها كائن JavaScript اختياريًا يمثّل transformer. يمكن أن تحتوي هذه العناصر على أيّ من الطرق التالية:

فريق transformer

  • start(controller): يتم استدعاء هذا الإجراء فور إنشاء العنصر. يُستخدم هذا الإجراء عادةً لوضع أجزاء البادئة في قائمة الانتظار، وذلك باستخدام controller.enqueue(). وسيتم قراءة هذه الأجزاء من الجانب القابل للقراءة، ولكنها لا تعتمد على أي عمليات كتابة في الجانب القابل للكتابة. إذا كانت هذه العملية الأولية غير متزامنة، مثلاً لأنّها تتطلّب بعض الجهد للحصول على أجزاء البادئة، يمكن أن تعرض الدالة وعدًا للإشارة إلى النجاح أو الفشل، وسيؤدي الوعد المرفوض إلى حدوث خطأ في البث. سيتم إعادة طرح أي استثناءات تم طرحها من خلال الدالة الإنشائية TransformStream().
  • transform(chunk, controller): يتم استدعاء هذه الطريقة عندما تكون هناك مجموعة جديدة تمت كتابتها في الجانب القابل للكتابة وجاهزة للتحويل. يضمن تنفيذ البث أنّه لن يتم استدعاء هذه الدالة إلا بعد نجاح عمليات التحويل السابقة، ولن يتم استدعاؤها قبل اكتمال start() أو بعد استدعاء flush(). تنفّذ هذه الدالة عملية التحويل الفعلية لتدفق التحويل. يمكنه إضافة النتائج إلى قائمة الانتظار باستخدام controller.enqueue(). يسمح ذلك بأن يؤدي جزء واحد مكتوب على الجانب القابل للكتابة إلى صفر أو عدة أجزاء على الجانب القابل للقراءة، وذلك استنادًا إلى عدد المرات التي يتم فيها استدعاء controller.enqueue(). إذا كانت عملية التحويل غير متزامنة، يمكن أن تعرض هذه الدالة وعدًا للإشارة إلى نجاح عملية التحويل أو فشلها. سيؤدي رفض الوعد إلى حدوث خطأ في كل من الجانبين القابل للقراءة والقابل للكتابة من دفق التحويل. في حال عدم توفير طريقة transform()، يتم استخدام عملية تحويل المعرّف، ما يؤدي إلى وضع أجزاء في قائمة الانتظار بدون تغيير من الجانب القابل للكتابة إلى الجانب القابل للقراءة.
  • flush(controller): يتم استدعاء هذه الطريقة بعد أن يتم تحويل جميع الأجزاء المكتوبة إلى الجانب القابل للكتابة من خلال المرور بنجاح عبر transform()، وقبل إغلاق الجانب القابل للكتابة. يُستخدم هذا عادةً لوضع أجزاء اللاحقة في قائمة الانتظار على الجانب القابل للقراءة، قبل أن يتم إغلاق هذا الجانب أيضًا. إذا كانت عملية الإفراغ غير متزامنة، يمكن أن تعرض الدالة وعدًا للإشارة إلى النجاح أو الفشل، وسيتم إبلاغ المتصل بـ stream.writable.write() بالنتيجة. بالإضافة إلى ذلك، سيؤدي رفض الوعد إلى حدوث خطأ في كل من الجانبين القابل للقراءة والكتابة من البث. ويتم التعامل مع طرح استثناء بالطريقة نفسها التي يتم بها التعامل مع عرض مرفوض.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

استراتيجيتَا وضع المحتوى في قائمة الانتظار writableStrategy وreadableStrategy

المَعلمتان الاختياريتان الثانية والثالثة للدالة الإنشائية TransformStream() هما استراتيجيتان اختياريتان writableStrategy وreadableStrategy لتحديد ترتيب تنفيذ المهام. ويتم تحديدها كما هو موضّح في قسمَي البيانات القابلة للقراءة والبيانات القابلة للكتابة على التوالي.

عيّنة رمز تحويل البث

يعرض نموذج الرمز البرمجي التالي عملية بسيطة لتحويل البيانات.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

توجيه دفق قابل للقراءة من خلال دفق تحويل

توفّر طريقة pipeThrough() التابعة لواجهة ReadableStream طريقة قابلة للتسلسل لتوجيه البث الحالي من خلال بث تحويل أو أي زوج آخر قابل للقراءة والكتابة. سيؤدي نقل بث إلى قفله بشكل عام طوال مدة النقل، ما يمنع القراء الآخرين من قفله.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

يوضّح نموذج الرمز التالي (الذي يبدو مصطنعًا بعض الشيء) كيف يمكنك تنفيذ نسخة "صراخ" من fetch() تحوّل كل النص إلى أحرف كبيرة من خلال استخدام وعد الاستجابة الذي تم إرجاعه كبيانات متدفقة وتحوّل كل جزء إلى أحرف كبيرة. تتمثّل ميزة هذا الأسلوب في أنّه لا يتطلّب انتظار تنزيل المستند بأكمله، ما قد يحدث فرقًا كبيرًا عند التعامل مع الملفات الكبيرة.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

عرض توضيحي

يعرض العرض التوضيحي أدناه عمليات قراءة وكتابة وتحويل البيانات في بث مباشر. يتضمّن هذا المستند أيضًا أمثلة على سلاسل الأنابيب pipeThrough() وpipeTo()، ويوضّح أيضًا tee(). يمكنك اختياريًا تشغيل العرض التوضيحي في نافذة منفصلة أو عرض رمز المصدر.

قنوات مفيدة متوفّرة في المتصفّح

يتضمّن المتصفّح عددًا من خلاصات البيانات المفيدة. يمكنك بسهولة إنشاء ReadableStream من كائن ثنائي كبير الحجم. تعرض طريقة stream() في واجهة Blob ReadableStream، وعند قراءتها، تعرض البيانات الواردة في الكائن الثنائي الكبير. تذكَّر أيضًا أنّ الكائن File هو نوع محدّد من Blob، ويمكن استخدامه في أي سياق يمكن فيه استخدام كائن ثنائي كبير.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

يُطلق على صيغ البث المباشر من TextDecoder.decode() وTextEncoder.encode() اسم TextDecoderStream وTextEncoderStream على التوالي.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

يمكنك بسهولة ضغط ملف أو فك ضغطه باستخدام CompressionStream وDecompressionStream على التوالي. يوضّح نموذج الرمز البرمجي أدناه كيف يمكنك تنزيل مواصفات Streams وضغطها (gzip) مباشرةً في المتصفّح وكتابة الملف المضغوط مباشرةً على القرص.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

تُعدّ واجهة برمجة التطبيقات File System Access API، وFileSystemWritableFileStream، وfetch() طلبات البث التجريبية أمثلة على عمليات البث القابلة للكتابة في العالم الحقيقي.

تستخدم Serial API بشكل كبير كلاً من عمليات القراءة والكتابة.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

أخيرًا، تدمج واجهة برمجة التطبيقات WebSocketStream البث مع WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

مراجع مفيدة

الإقرارات

تمت مراجعة هذه المقالة من قِبل جيك أرشيبالد و فرانسوا بوفورت و سام داتون و ماتياس بولينز و Surma و جو ميدلي و آدم رايس. لقد ساعدتني منشورات المدونة التي كتبها Jake Archibald كثيرًا في فهم عمليات البث. بعض نماذج الرموز مستوحاة من استكشافات المستخدم bellbind@ على GitHub، كما أنّ أجزاء من النص تستند بشكل كبير إلى مستندات MDN Web Docs حول عمليات البث. لقد بذل مؤلفو معيار Streams جهدًا كبيرًا في كتابة هذه المواصفات. الصورة الرئيسية من ريان لارا على Unsplash.