جریان - راهنمای قطعی

با استفاده از Streams API، نحوه استفاده از جریان‌های قابل خواندن، قابل نوشتن و تبدیل را بیاموزید.

Streams API به شما این امکان را می‌دهد که به صورت برنامه‌نویسی به جریان‌های داده‌ای که از طریق شبکه دریافت می‌شوند یا با هر وسیله محلی ایجاد شده‌اند دسترسی داشته باشید و آنها را با جاوا اسکریپت پردازش کنید. استریم شامل تجزیه منبعی است که می‌خواهید دریافت کنید، ارسال کنید یا به تکه‌های کوچک تبدیل کنید و سپس این تکه‌ها را ذره ذره پردازش کنید. در حالی که پخش جریانی کاری است که مرورگرها هنگام دریافت دارایی‌هایی مانند HTML یا ویدیوها برای نمایش در صفحات وب انجام می‌دهند، این قابلیت قبل از اینکه fetch با استریم‌ها در سال 2015 معرفی شود، هرگز برای جاوا اسکریپت در دسترس نبوده است.

قبلاً، اگر می‌خواستید یک منبع را پردازش کنید (چه ویدیو باشد یا یک فایل متنی و غیره)، باید کل فایل را دانلود کنید، منتظر بمانید تا به فرمت مناسب تبدیل شود و سپس پردازش کنید. آن را با در دسترس بودن جریان ها برای جاوا اسکریپت، همه اینها تغییر می کند. اکنون می‌توانید داده‌های خام را با جاوا اسکریپت به‌محض اینکه در کلاینت در دسترس قرار گرفت، بدون نیاز به ایجاد بافر، رشته یا حباب، به تدریج پردازش کنید. این کار تعدادی از موارد استفاده را باز می کند که برخی از آنها را در زیر لیست می کنم:

  • جلوه های ویدیویی: لوله گذاری یک جریان ویدیویی قابل خواندن از طریق یک جریان تبدیل که جلوه ها را در زمان واقعی اعمال می کند.
  • فشرده سازی داده ها: لوله گذاری یک جریان فایل از طریق یک جریان تبدیل که به طور انتخابی آن را فشرده می کند.
  • رمزگشایی تصویر: لوله گذاری یک جریان پاسخ HTTP از طریق یک جریان تبدیل که بایت ها را به داده های بیت مپ رمزگشایی می کند، و سپس از طریق جریان تبدیل دیگری که بیت مپ ها را به PNG ترجمه می کند. اگر در داخل fetch یک سرویس‌کار نصب شده باشد، این به شما امکان می‌دهد فرمت‌های تصویر جدید مانند AVIF را به صورت شفاف پر کنید.

پشتیبانی از مرورگر

ReadableStream و WritableStream

پشتیبانی مرورگر

  • کروم: 43.
  • لبه: 14.
  • فایرفاکس: 65.
  • سافاری: 10.1.

منبع

TransformStream

پشتیبانی مرورگر

  • کروم: 67.
  • لبه: 79.
  • فایرفاکس: 102.
  • سافاری: 14.1.

منبع

مفاهیم اصلی

قبل از اینکه به جزئیات انواع مختلف جریان ها بپردازم، اجازه دهید برخی از مفاهیم اصلی را معرفی کنم.

تکه ها

تکه تکه ای از داده است که در یک جریان نوشته می شود یا از آن خوانده می شود. می تواند از هر نوع باشد؛ جریان ها حتی می توانند شامل تکه هایی از انواع مختلف باشند. در بیشتر مواقع، یک قطعه اتمی ترین واحد داده برای یک جریان معین نخواهد بود. به عنوان مثال، یک جریان بایت ممکن است شامل تکه هایی از 16 کیلوبایت واحد Uint8Array به جای بایت های تک باشد.

جریان های خواندنی

یک جریان قابل خواندن نشان دهنده منبع داده ای است که می توانید از آن مطالعه کنید. به عبارت دیگر، داده ها از یک جریان قابل خواندن خارج می شوند . به طور مشخص، یک جریان قابل خواندن نمونه ای از کلاس ReadableStream است.

جریان های قابل نوشتن

یک جریان قابل نوشتن نشان دهنده مقصدی برای داده است که می توانید در آن بنویسید. به عبارت دیگر، داده ها به یک جریان قابل نوشتن وارد می شوند . به طور مشخص، یک جریان قابل نوشتن نمونه ای از کلاس WritableStream است.

جریان ها را متحول کنید

یک جریان تبدیل از یک جفت جریان تشکیل شده است: یک جریان قابل نوشتن، که به عنوان سمت قابل نوشتن آن شناخته می شود، و یک جریان قابل خواندن، که به عنوان سمت قابل خواندن آن شناخته می شود. یک استعاره در دنیای واقعی برای این می‌تواند یک مترجم همزمان باشد که از یک زبان به زبان دیگر در حال پرواز ترجمه می‌کند. به روشی خاص برای جریان تبدیل، نوشتن در سمت قابل نوشتن منجر به ارائه داده های جدید برای خواندن از سمت قابل خواندن می شود. بطور مشخص، هر شیئی با خاصیت writable و قابلیت readable می تواند به عنوان یک جریان تبدیل عمل کند. با این حال، کلاس TransformStream استاندارد، ایجاد چنین جفتی را که به درستی در هم پیچیده شده است، آسان تر می کند.

زنجیر لوله

جریان ها عمدتاً با لوله کشی آنها به یکدیگر استفاده می شوند. یک جریان قابل خواندن را می توان با استفاده از روش pipeTo() جریان قابل خواندن، مستقیماً به یک جریان قابل نوشتن لوله کرد، یا می توان آن را ابتدا از طریق یک یا چند جریان تبدیل با استفاده از روش pipeThrough() جریان خواندنی هدایت کرد. مجموعه‌ای از جریان‌ها که به این روش با هم لوله می‌شوند، زنجیره لوله نامیده می‌شوند.

پس فشار

هنگامی که یک زنجیره لوله ساخته می شود، سیگنال هایی در مورد اینکه تکه ها با چه سرعتی باید در آن جریان داشته باشند منتشر می کند. اگر هر مرحله‌ای در زنجیره هنوز نتواند تکه‌ها را بپذیرد، سیگنالی را به سمت عقب از طریق زنجیره لوله منتشر می‌کند تا در نهایت به منبع اصلی گفته شود که تولید تکه‌ها را با این سرعت متوقف کند. این فرآیند عادی سازی جریان، فشار برگشتی نامیده می شود.

تیینگ

یک جریان خوانا را می‌توان با استفاده از روش tee() تِید کرد (نام آن بر اساس شکل حرف T بزرگ). با این کار جریان قفل می شود، یعنی دیگر قابل استفاده مستقیم نیست. با این حال، دو جریان جدید به نام شاخه ایجاد می کند که می تواند به طور مستقل مصرف شود. Teeing همچنین مهم است زیرا جریان ها را نمی توان بازگردانی یا راه اندازی مجدد کرد، در ادامه در مورد این موضوع بیشتر توضیح خواهیم داد.

نمودار یک زنجیره لوله متشکل از یک جریان قابل خواندن که از یک فراخوانی به API واکشی می‌آید که سپس از طریق یک جریان تبدیل که خروجی آن تید می‌شود، لوله می‌شود و سپس برای اولین جریان قابل خواندن نتیجه به مرورگر ارسال می‌شود و به کش سرویس کار برای دومین جریان قابل خواندن نتیجه
یک زنجیر لوله

مکانیک یک جریان قابل خواندن

یک جریان قابل خواندن یک منبع داده است که در جاوا اسکریپت توسط یک شی ReadableStream که از یک منبع زیرین جریان می یابد، نمایش داده می شود. سازنده ReadableStream() یک شی جریان قابل خواندن را از کنترل کننده های داده شده ایجاد و برمی گرداند. دو نوع منبع اساسی وجود دارد:

  • هنگامی که به آنها دسترسی پیدا کردید، منابع فشار دائماً داده ها را به سمت شما می فرستند، و شروع، توقف یا لغو دسترسی به جریان به عهده شماست. به عنوان مثال می توان به جریان های ویدیویی زنده، رویدادهای ارسال شده توسط سرور یا WebSockets اشاره کرد.
  • منابع کششی از شما می‌خواهند که پس از اتصال به آنها صریحاً داده‌هایی را از آنها درخواست کنید. به عنوان مثال می توان به عملیات HTTP از طریق تماس های fetch() یا XMLHttpRequest اشاره کرد.

داده های جریان به صورت متوالی در قطعات کوچک به نام تکه خوانده می شوند. گفته می شود که تکه هایی که در یک جریان قرار می گیرند در صف قرار می گیرند . این بدان معنی است که آنها در یک صف آماده برای خواندن منتظر هستند. یک صف داخلی تکه هایی را که هنوز خوانده نشده اند ردیابی می کند.

یک استراتژی صف شیئی است که تعیین می کند چگونه یک جریان باید بر اساس وضعیت صف داخلی خود سیگنال فشار برگشتی را نشان دهد. استراتژی صف بندی یک اندازه به هر تکه اختصاص می دهد و اندازه کل همه تکه های صف را با یک عدد مشخص مقایسه می کند که به عنوان علامت آب بالا شناخته می شود.

تکه های داخل جریان توسط خواننده خوانده می شود. این خواننده داده ها را یک تکه در یک زمان بازیابی می کند و به شما امکان می دهد هر نوع عملیاتی را که می خواهید روی آن انجام دهید. خواننده به اضافه کد پردازش دیگری که همراه با آن است، مصرف کننده نامیده می شود.

ساختار بعدی در این زمینه کنترل کننده نامیده می شود. هر جریان قابل خواندن دارای یک کنترلر مرتبط است که همانطور که از نام آن پیداست به شما امکان می دهد جریان را کنترل کنید.

فقط یک خواننده می تواند یک جریان را در یک زمان بخواند. هنگامی که یک خواننده ایجاد می شود و شروع به خواندن یک جریان می کند (یعنی خواننده فعال می شود)، روی آن قفل می شود. اگر می‌خواهید خواننده دیگری مسئولیت خواندن جریان شما را بر عهده بگیرد، معمولاً باید قبل از انجام هر کار دیگری، اولین خواننده را آزاد کنید (اگرچه می‌توانید استریم‌ها را تیپ کنید ).

ایجاد یک جریان قابل خواندن

شما با فراخوانی سازنده آن ReadableStream() یک جریان قابل خواندن ایجاد می کنید. سازنده یک آرگومان اختیاری underlyingSource دارد، که یک شی را با متدها و ویژگی هایی نشان می دهد که نحوه رفتار نمونه جریان ساخته شده را مشخص می کند.

منبع underlyingSource

این می تواند از روش های اختیاری و تعریف شده توسط توسعه دهنده زیر استفاده کند:

  • start(controller) : بلافاصله پس از ساخت شیء فراخوانی می شود. این روش می‌تواند به منبع جریان دسترسی داشته باشد، و هر کار دیگری که برای تنظیم عملکرد جریان لازم است انجام دهد. اگر قرار باشد این فرآیند به صورت ناهمزمان انجام شود، روش می تواند یک وعده را به نشانه موفقیت یا شکست برگرداند. پارامتر controller ارسال شده به این روش یک ReadableStreamDefaultController است.
  • pull(controller) : می توان از آن برای کنترل جریان استفاده کرد زیرا تکه های بیشتری واکشی می شوند. تا زمانی که صف داخلی تکه های جریان پر نباشد، به طور مکرر فراخوانی می شود، تا زمانی که صف به علامت آب بالای خود برسد. اگر نتیجه فراخوانی pull() یک وعده باشد، pull() دوباره فراخوانی نخواهد شد تا زمانی که قول گفته شده محقق شود. اگر قول رد شود، جریان دچار خطا می شود.
  • cancel(reason) : هنگامی که مصرف کننده جریان جریان را لغو می کند، تماس می گیرد.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController از روش های زیر پشتیبانی می کند:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

دومین آرگومان سازنده سازنده ReadableStream() نیز اختیاری است queuingStrategy . این یک شی است که به صورت اختیاری یک استراتژی صف برای جریان تعریف می کند که دو پارامتر دارد:

  • highWaterMark : یک عدد غیر منفی که نشان دهنده علامت بالای آب جریان با استفاده از این استراتژی صف بندی است.
  • size(chunk) : تابعی که اندازه غیر منفی متناهی مقدار قطعه داده شده را محاسبه و برمی گرداند. نتیجه برای تعیین فشار برگشتی استفاده می‌شود که از طریق ویژگی ReadableStreamDefaultController.desiredSize مناسب آشکار می‌شود. همچنین زمانی که متد pull() منبع زیربنایی فراخوانی شود، حاکم است.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

متدهای getReader() و read()

برای خواندن از یک جریان قابل خواندن، به یک خواننده نیاز دارید که ReadableStreamDefaultReader خواهد بود. متد getReader() رابط ReadableStream یک خواننده ایجاد می کند و جریان را روی آن قفل می کند. در حالی که جریان قفل است، تا زمانی که این یکی منتشر نشود، نمی توان خواننده دیگری را به دست آورد.

متد read() واسط ReadableStreamDefaultReader قولی را برمی‌گرداند که به قسمت بعدی در صف داخلی جریان دسترسی پیدا می‌کند. بسته به وضعیت جریان، نتیجه ای را برآورده یا رد می کند. احتمالات مختلف به شرح زیر است:

  • اگر تکه ای موجود باشد، قول با یک شی از فرم محقق می شود
    { value: chunk, done: false } .
  • اگر جریان بسته شود، قول با یک شی از شکل محقق می شود
    { value: undefined, done: true } .
  • در صورت خطا شدن جریان، قول با خطای مربوطه رد می شود.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

ملک locked

با دسترسی به ویژگی ReadableStream.locked آن، می توانید بررسی کنید که آیا یک جریان قابل خواندن قفل شده است.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نمونه کد جریان قابل خواندن

نمونه کد زیر تمام مراحل را در عمل نشان می دهد. ابتدا یک ReadableStream ایجاد می کنید که در آرگومان underlyingSource آن (یعنی کلاس TimestampSource ) یک متد start() را تعریف می کند. این متد controller استریم می گوید که هر ثانیه در طول ده ثانیه یک timestamp enqueue() . در نهایت، به کنترل کننده می گوید که جریان را close() . شما این جریان را با ایجاد یک خواننده از طریق متد getReader() و فراخوانی read() مصرف می کنید تا پخش جریانی done .

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

تکرار ناهمزمان

بررسی هر تکرار حلقه read() در صورت done استریم ممکن است راحت‌ترین API نباشد. خوشبختانه به زودی راه بهتری برای انجام این کار وجود خواهد داشت: تکرار ناهمزمان.

for await (const chunk of stream) {
  console.log(chunk);
}

امروزه یک راه حل برای استفاده از تکرار ناهمزمان، پیاده سازی رفتار با پلی پر است.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

پخش جریانی خواندنی

متد tee() واسط ReadableStream جریان قابل خواندن فعلی را به تصویر می کشد و یک آرایه دو عنصری حاوی دو شاخه به دست آمده را به عنوان نمونه های ReadableStream جدید برمی گرداند. این به دو خواننده اجازه می دهد تا یک جریان را به طور همزمان بخوانند. برای مثال، اگر می‌خواهید پاسخی را از سرور دریافت کنید و آن را به مرورگر پخش کنید، و همچنین آن را به کش سرویس‌کار نیز پخش کنید، ممکن است این کار را انجام دهید. از آنجایی که یک بدنه پاسخ نمی تواند بیش از یک بار مصرف شود، برای انجام این کار به دو نسخه نیاز دارید. برای لغو جریان، باید هر دو شاخه حاصل را لغو کنید. پخش جریانی معمولاً آن را برای مدت زمان قفل می کند و از قفل کردن آن توسط سایر خوانندگان جلوگیری می کند.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

جریان های بایت قابل خواندن

برای جریان‌هایی که بایت‌ها را نشان می‌دهند، یک نسخه توسعه‌یافته از جریان قابل خواندن برای مدیریت کارآمد بایت‌ها، به ویژه با به حداقل رساندن کپی‌ها، ارائه شده است. جریان‌های بایتی اجازه می‌دهند تا خوانندگان بافر خود را بیاورید (BYOB). اجرای پیش‌فرض می‌تواند طیفی از خروجی‌های مختلف مانند رشته‌ها یا بافرهای آرایه را در مورد WebSockets ارائه دهد، در حالی که جریان‌های بایت خروجی بایت را تضمین می‌کنند. علاوه بر این، خوانندگان BYOB مزایای پایداری دارند. این به این دلیل است که اگر یک بافر جدا شود، می تواند تضمین کند که یک بافر دو بار در یک بافر نوشته نمی شود، بنابراین از شرایط مسابقه اجتناب می شود. خواننده های BYOB می توانند تعداد دفعاتی را که مرورگر برای اجرای جمع آوری زباله نیاز دارد کاهش دهد، زیرا می تواند از بافرها استفاده مجدد کند.

ایجاد یک جریان بایت قابل خواندن

شما می توانید با ارسال یک پارامتر type اضافی به سازنده ReadableStream() یک جریان بایت قابل خواندن ایجاد کنید.

new ReadableStream({ type: 'bytes' });

منبع underlyingSource

منبع اصلی یک جریان بایت قابل خواندن یک ReadableByteStreamController برای دستکاری داده می شود. متد ReadableByteStreamController.enqueue() آن یک آرگومان chunk می گیرد که مقدار آن ArrayBufferView است. ویژگی ReadableByteStreamController.byobRequest درخواست کشش فعلی BYOB را برمی‌گرداند یا اگر وجود نداشته باشد، تهی است. در نهایت، ویژگی ReadableByteStreamController.desiredSize اندازه مورد نظر را برای پر کردن صف داخلی جریان کنترل شده برمی گرداند.

queuingStrategy

دومین آرگومان سازنده سازنده ReadableStream() نیز اختیاری است queuingStrategy . این یک شی است که به صورت اختیاری یک استراتژی صف برای جریان تعریف می کند که یک پارامتر را می گیرد:

  • highWaterMark : تعداد غیرمنفی بایت‌ها که نشان‌دهنده علامت آبی بالای جریان با استفاده از این استراتژی صف‌بندی است. این برای تعیین فشار برگشتی استفاده می شود که از طریق ویژگی ReadableByteStreamController.desiredSize مناسب نشان داده می شود. همچنین زمانی که متد pull() منبع زیربنایی فراخوانی شود، حاکم است.

متدهای getReader() و read()

سپس می توانید با تنظیم پارامتر mode مطابق با آن، به ReadableStreamBYOBReader دسترسی پیدا کنید: ReadableStream.getReader({ mode: "byob" }) . این امکان کنترل دقیق تری بر تخصیص بافر را فراهم می کند تا از کپی جلوگیری شود. برای خواندن از جریان بایت، باید ReadableStreamBYOBReader.read(view) فراخوانی کنید، جایی که view یک ArrayBufferView است.

نمونه کد جریان بایت قابل خواندن

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

تابع زیر جریان‌های بایت قابل خواندن را برمی‌گرداند که امکان خواندن کارآمد صفر کپی آرایه‌ای که به‌طور تصادفی تولید می‌شود را فراهم می‌کند. به جای استفاده از اندازه قطعه از پیش تعیین شده 1024، سعی می کند بافر ارائه شده توسط توسعه دهنده را پر کند و امکان کنترل کامل را فراهم می کند.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

مکانیک یک جریان قابل نوشتن

یک جریان قابل نوشتن مقصدی است که می‌توانید داده‌هایی را در آن بنویسید که در جاوا اسکریپت توسط یک شی WritableStream نمایش داده می‌شود. این به عنوان یک انتزاع در بالای یک سینک زیرین عمل می کند - یک ورودی/خروجی سطح پایین تر که داده های خام در آن نوشته می شود.

داده‌ها از طریق یک رایتر ، یک تکه در یک زمان، روی جریان نوشته می‌شوند. یک تکه می تواند اشکال مختلفی داشته باشد، درست مانند تکه های یک خواننده. می توانید از هر کدی که دوست دارید برای تولید تکه های آماده برای نوشتن استفاده کنید. نویسنده به اضافه کد مرتبط تولید کننده نامیده می شود.

هنگامی که یک نویسنده ایجاد می شود و شروع به نوشتن در یک جریان (یک نویسنده فعال ) می کند، گفته می شود که به آن قفل شده است. فقط یک نویسنده می تواند در یک جریان قابل نوشتن در یک زمان بنویسد. اگر می‌خواهید نویسنده دیگری شروع به نوشتن در جریان شما کند، معمولاً باید آن را منتشر کنید، قبل از اینکه نویسنده دیگری را به آن وصل کنید.

یک صف داخلی تکه هایی را که در جریان نوشته شده اند اما هنوز توسط سینک زیرین پردازش نشده اند، پیگیری می کند.

یک استراتژی صف شیئی است که تعیین می کند چگونه یک جریان باید بر اساس وضعیت صف داخلی خود سیگنال فشار برگشتی را نشان دهد. استراتژی صف بندی یک اندازه به هر تکه اختصاص می دهد و اندازه کل همه تکه های صف را با یک عدد مشخص مقایسه می کند که به عنوان علامت آب بالا شناخته می شود.

ساختار نهایی یک کنترل کننده نامیده می شود. هر جریان قابل نوشتن دارای یک کنترلر مرتبط است که به شما امکان می دهد جریان را کنترل کنید (به عنوان مثال، آن را لغو کنید).

ایجاد یک جریان قابل نوشتن

رابط WritableStream از Streams API یک انتزاع استاندارد برای نوشتن داده های جریانی در یک مقصد ارائه می دهد که به عنوان سینک شناخته می شود. این شی با فشار پشتی و صف داخلی همراه است. شما یک جریان قابل نوشتن با فراخوانی سازنده آن WritableStream() ایجاد می کنید. دارای یک پارامتر underlyingSink اختیاری است که یک شی را با روش ها و ویژگی هایی نشان می دهد که نحوه رفتار نمونه جریان ساخته شده را مشخص می کند.

سینک underlyingSink

underlyingSink می‌تواند شامل روش‌های اختیاری و تعریف‌شده توسط توسعه‌دهنده زیر باشد. پارامتر controller ارسال شده به برخی از متدها WritableStreamDefaultController است.

  • start(controller) : این متد بلافاصله پس از ساخت شیء فراخوانی می شود. محتوای این روش باید به منظور دسترسی به سینک زیرین باشد. اگر قرار باشد این فرآیند به صورت ناهمزمان انجام شود، می تواند یک وعده را به نشانه موفقیت یا شکست برگرداند.
  • write(chunk, controller) : این روش زمانی فراخوانی می شود که یک قطعه جدید از داده (که در پارامتر chunk مشخص شده است) برای نوشتن در سینک زیرین آماده باشد. می تواند یک وعده را به سیگنال موفقیت یا شکست عملیات نوشتن بازگرداند. این روش تنها پس از موفقیت در نوشتن قبلی فراخوانی می شود، و هرگز پس از بسته شدن یا قطع شدن جریان.
  • close(controller) : این روش در صورتی فراخوانی می شود که برنامه سیگنال دهد که نوشتن تکه ها در جریان به پایان رسیده است. محتویات باید هر کاری را که لازم است برای نهایی کردن نوشته‌ها در سینک زیرین و آزاد کردن دسترسی به آن انجام دهند. اگر این فرآیند ناهمزمان باشد، می تواند یک وعده را به نشانه موفقیت یا شکست بازگرداند. این متد تنها پس از موفقیت آمیز بودن تمام نوشتن های در صف فراخوانی می شود.
  • abort(reason) : این روش در صورتی فراخوانی می شود که برنامه سیگنال دهد که می خواهد به طور ناگهانی جریان را ببندد و آن را در حالت خطا قرار دهد. می تواند هر منبع نگهداری شده را پاک کند، مانند close() ، اما abort() حتی اگر نوشته ها در صف قرار گیرند فراخوانی می شود. آن تکه ها دور ریخته خواهند شد. اگر این فرآیند ناهمزمان باشد، می تواند یک وعده را به نشانه موفقیت یا شکست بازگرداند. پارامتر reason حاوی یک DOMString است که علت قطع جریان را توضیح می دهد.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

رابط WritableStreamDefaultController در Streams API کنترل‌کننده‌ای را نشان می‌دهد که امکان کنترل وضعیت WritableStream را در حین راه‌اندازی می‌دهد، زیرا تکه‌های بیشتری برای نوشتن ارسال می‌شوند، یا در پایان نوشتن. هنگام ساخت یک WritableStream ، به سینک زیرین یک نمونه WritableStreamDefaultController مربوطه داده می شود تا آن را دستکاری کند. WritableStreamDefaultController تنها یک روش دارد: WritableStreamDefaultController.error() که هر گونه تعاملات آتی با جریان مرتبط را با خطا مواجه می کند. WritableStreamDefaultController همچنین از ویژگی signal پشتیبانی می‌کند که نمونه‌ای از AbortSignal را برمی‌گرداند و اجازه می‌دهد عملیات WritableStream در صورت نیاز متوقف شود.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

آرگومان دوم سازنده WritableStream() نیز اختیاری است queuingStrategy . این یک شی است که به صورت اختیاری یک استراتژی صف برای جریان تعریف می کند که دو پارامتر دارد:

  • highWaterMark : یک عدد غیر منفی که نشان دهنده علامت بالای آب جریان با استفاده از این استراتژی صف بندی است.
  • size(chunk) : تابعی که اندازه غیر منفی متناهی مقدار قطعه داده شده را محاسبه و برمی گرداند. از نتیجه برای تعیین فشار برگشتی استفاده می‌شود که از طریق ویژگی WritableStreamDefaultWriter.desiredSize مناسب آشکار می‌شود.

متدهای getWriter() و write()

برای نوشتن در یک جریان قابل نوشتن، به یک نویسنده نیاز دارید که WritableStreamDefaultWriter باشد. متد getWriter() رابط WritableStream یک نمونه جدید از WritableStreamDefaultWriter را برمی گرداند و جریان را روی آن نمونه قفل می کند. در حالی که جریان قفل است، تا زمانی که نسخه فعلی منتشر نشود، نمی توان نویسنده دیگری را به دست آورد.

متد write() واسط WritableStreamDefaultWriter یک تکه داده ارسال شده را به WritableStream و سینک زیرین آن می نویسد، سپس یک وعده را برمی گرداند که موفقیت یا شکست عملیات نوشتن را نشان می دهد. توجه داشته باشید که "موفقیت" به معنای آن است. ممکن است نشان دهد که این قطعه پذیرفته شده است، و نه لزوماً اینکه به طور ایمن به مقصد نهایی خود ذخیره شده است.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

ملک locked

با دسترسی به ویژگی WritableStream.locked آن، می‌توانید بررسی کنید که آیا یک جریان قابل نوشتن قفل شده است.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نمونه کد جریان قابل نوشتن

نمونه کد زیر تمام مراحل را در عمل نشان می دهد.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

انتقال یک جریان قابل خواندن به یک جریان قابل نوشتن

یک جریان قابل خواندن را می توان از طریق روش pipeTo() جریان قابل خواندن به یک جریان قابل نوشتن لوله کرد. ReadableStream.pipeTo() ReadableStream کنونی را به یک WritableStream داده شده لوله می کند و وعده ای را برمی گرداند که زمانی که فرآیند لوله کشی با موفقیت کامل شد محقق می شود، یا در صورت بروز هرگونه خطا، آن را رد می کند.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

ایجاد یک جریان تبدیل

رابط TransformStream از Streams API مجموعه ای از داده های قابل تبدیل را نشان می دهد. شما یک جریان تبدیل را با فراخوانی سازنده آن TransformStream() ایجاد می کنید که یک شی جریان تبدیل را از کنترل کننده های داده شده ایجاد و برمی گرداند. سازنده TransformStream() به عنوان اولین آرگومان خود یک شی جاوا اسکریپت اختیاری که transformer را نشان می دهد می پذیرد. چنین اشیایی می توانند شامل یکی از روش های زیر باشند:

transformer

  • start(controller) : این متد بلافاصله پس از ساخت شیء فراخوانی می شود. معمولاً از این برای قرار دادن تکه‌های پیشوند با استفاده از controller.enqueue() استفاده می‌شود. این تکه ها از سمت قابل خواندن خوانده می شوند اما به هیچ نوشته ای در سمت قابل نوشتن بستگی ندارند. اگر این فرآیند اولیه ناهمزمان باشد، برای مثال به این دلیل که برای بدست آوردن تکه‌های پیشوند مقداری تلاش لازم است، تابع می‌تواند یک وعده را به سیگنال موفقیت یا شکست برگرداند. یک وعده رد شده جریان را با خطا مواجه می کند. هر استثنای پرتاب شده توسط سازنده TransformStream() دوباره پرتاب می شود.
  • transform(chunk, controller) : این روش زمانی فراخوانی می شود که یک قطعه جدید که در ابتدا در سمت قابل نوشتن نوشته شده بود آماده تبدیل شود. اجرای جریان تضمین می‌کند که این تابع تنها پس از موفقیت در تبدیل‌های قبلی فراخوانی می‌شود، و هرگز قبل از اتمام start() یا بعد از flush() فراخوانی نمی‌شود. این تابع کار تبدیل واقعی جریان تبدیل را انجام می دهد. می تواند نتایج را با استفاده از controller.enqueue() در صف قرار دهد. این اجازه می‌دهد که یک تکه تکه نوشته شده در سمت قابل نوشتن، بسته به تعداد دفعاتی که controller.enqueue() فراخوانی شود، به صفر یا چند تکه در سمت قابل خواندن منجر شود. اگر فرآیند تبدیل ناهمزمان باشد، این تابع می تواند یک وعده را به سیگنال موفقیت یا شکست تبدیل بازگرداند. یک وعده رد شده هر دو طرف قابل خواندن و نوشتن جریان تبدیل را دچار خطا می کند. اگر متد transform() ارائه نشده باشد، تبدیل هویت استفاده می‌شود که تکه‌های بدون تغییر از سمت قابل نوشتن به سمت قابل خواندن را در صف قرار می‌دهد.
  • flush(controller) : این متد پس از اینکه تمام تکه های نوشته شده در سمت قابل نوشتن با عبور موفقیت آمیز از transform() تبدیل شده اند و سمت قابل نوشتن در شرف بسته شدن است فراخوانی می شود. معمولاً از این برای قرار دادن تکه‌های پسوند در سمت قابل خواندن، قبل از بسته شدن استفاده می‌شود. اگر فرآیند فلاشینگ ناهمزمان باشد، تابع می تواند یک وعده را به سیگنال موفقیت یا شکست برگرداند. نتیجه به تماس گیرنده stream.writable.write() اطلاع رسانی خواهد شد. علاوه بر این، یک وعده رد شده هر دو طرف قابل خواندن و نوشتن جریان را با خطا مواجه می کند. پرتاب یک استثنا مانند بازگرداندن قول رد شده تلقی می شود.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

استراتژی های writableStrategy و readableStrategy صف بندی

دومین و سومین پارامتر اختیاری سازنده TransformStream() عبارتند از استراتژی های writableStrategy اختیاری و صف بندی readableStrategy . آنها به ترتیب در بخش‌های جریان خواندنی و قابل نوشتن مشخص شده‌اند.

تبدیل نمونه کد جریان

نمونه کد زیر یک جریان تبدیل ساده را در عمل نشان می دهد.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

لوله گذاری یک جریان قابل خواندن از طریق یک جریان تبدیل

متد pipeThrough() رابط ReadableStream یک روش زنجیره‌ای برای لوله‌کشی جریان جاری از طریق یک جریان تبدیل یا هر جفت قابل نوشتن/خواندن دیگری ارائه می‌کند. لوله کشی یک جریان عموماً آن را در طول مدت لوله قفل می کند و از قفل شدن آن توسط سایر خوانندگان جلوگیری می کند.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

نمونه کد بعدی (کمی ساختگی) نشان می‌دهد که چگونه می‌توانید یک نسخه «فریاد» از fetch() را پیاده‌سازی کنید که تمام متن را با استفاده از قول پاسخ برگشتی به‌عنوان یک جریان و بزرگ‌کردن تکه به تکه، بزرگ‌نمایی می‌کند. مزیت این روش این است که شما نیازی به صبر کردن برای دانلود کل سند ندارید، که می تواند تفاوت بزرگی در هنگام کار با فایل های بزرگ ایجاد کند.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

نسخه ی نمایشی

دمو زیر جریان های قابل خواندن، قابل نوشتن و تبدیل را در عمل نشان می دهد. همچنین شامل نمونه‌هایی از زنجیره‌های لوله pipeThrough() و pipeTo() می‌شود و همچنین tee() را نشان می‌دهد. شما می توانید به صورت اختیاری نسخه ی نمایشی را در پنجره خود اجرا کنید یا کد منبع را مشاهده کنید.

جریان های مفید موجود در مرورگر

تعدادی جریان مفید مستقیماً در مرورگر تعبیه شده است. شما به راحتی می توانید یک ReadableStream از یک حباب ایجاد کنید. متد stream() واسط Blob یک ReadableStream را برمی گرداند که پس از خواندن داده های موجود در blob را برمی گرداند. همچنین به یاد داشته باشید که یک شی File نوع خاصی از Blob است و می تواند در هر زمینه ای که یک blob می تواند استفاده شود.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

انواع جریان TextDecoder.decode() و TextEncoder.encode() به ترتیب TextDecoderStream و TextEncoderStream نامیده می شوند.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

فشرده سازی یا از حالت فشرده خارج کردن یک فایل به ترتیب با جریان های تبدیل CompressionStream و DecompressionStream آسان است. نمونه کد زیر نشان می دهد که چگونه می توانید مشخصات Streams را دانلود کنید، آن را مستقیماً در مرورگر فشرده (gzip) کنید و فایل فشرده شده را مستقیماً روی دیسک بنویسید.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream API دسترسی به فایل سیستم و جریان های درخواست fetch() نمونه هایی از جریان های قابل نوشتن در طبیعت هستند.

Serial API از هر دو جریان قابل خواندن و نوشتن استفاده زیادی می کند.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

در نهایت، WebSocketStream API جریان ها را با WebSocket API یکپارچه می کند.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

منابع مفید

قدردانی

این مقاله توسط جیک آرچیبالد ، فرانسوا بوفور ، سام داتون ، ماتیاس بوئلنس ، سورما ، جو مدلی و آدام رایس بررسی شده است. پست های وبلاگ جیک آرچیبالد به من در درک جریان ها کمک زیادی کرده است. برخی از نمونه‌های کد از کاوش‌های کاربر GitHub @bellbind الهام گرفته شده‌اند و بخش‌هایی از نثر به شدت بر روی اسناد وب MDN در Streams ساخته شده‌اند. نویسندگان Streams Standard کار فوق العاده ای در نوشتن این مشخصات انجام داده اند. تصویر قهرمان توسط رایان لارا در Unsplash .