สตรีม—คู่มือฉบับสมบูรณ์

ดูวิธีใช้สตรีมที่อ่านได้ เขียนได้ และเปลี่ยนรูปแบบด้วย Streams API

Streams API ช่วยให้คุณเข้าถึงสตรีมข้อมูลที่รับผ่านเครือข่ายหรือสร้างขึ้นด้วยวิธีใดก็ได้ในเครื่องแบบเป็นโปรแกรม และประมวลผลด้วย JavaScript สตรีมมิงเกี่ยวข้องกับการแบ่งทรัพยากรที่คุณต้องการรับ ส่ง หรือเปลี่ยนรูปแบบเป็นกลุ่มเล็กๆ แล้วประมวลผลกลุ่มเหล่านี้ทีละส่วน แม้ว่าการสตรีมเป็นสิ่งที่เบราว์เซอร์ทำอยู่แล้วเมื่อได้รับชิ้นงาน เช่น HTML หรือวิดีโอเพื่อแสดงในหน้าเว็บ แต่ JavaScript ไม่เคยมีความสามารถนี้มาก่อนจนกระทั่งมีการเปิดตัว fetch ที่มีสตรีมในปี 2015

ก่อนหน้านี้ หากต้องการประมวลผลทรัพยากรบางประเภท (ไม่ว่าจะเป็นวิดีโอหรือไฟล์ข้อความ ฯลฯ) คุณจะต้องดาวน์โหลดไฟล์ทั้งไฟล์ รอให้ไฟล์ได้รับการแปลงเป็นรูปแบบที่เหมาะสม แล้วจึงประมวลผล เมื่อสตรีมพร้อมใช้งานสำหรับ JavaScript ทุกอย่างก็เปลี่ยนไป ตอนนี้คุณสามารถประมวลผลข้อมูลดิบด้วย JavaScript แบบเป็นขั้นเป็นตอนได้ทันทีที่ข้อมูลพร้อมใช้งานบนไคลเอ็นต์ โดยไม่ต้องสร้างบัฟเฟอร์ สตรีง หรือบล็อก ซึ่งจะเปิดโอกาสให้ใช้ Use Case หลายรายการดังที่ระบุไว้ด้านล่าง

  • เอฟเฟกต์วิดีโอ: การส่งผ่านสตรีมวิดีโอที่อ่านได้ผ่านสตรีมการเปลี่ยนรูปแบบที่ใช้เอฟเฟกต์แบบเรียลไทม์
  • การ (บีบอัด/เลิกบีบอัด) ข้อมูล: การส่งผ่านสตรีมไฟล์ผ่านสตรีมการเปลี่ยนรูปแบบที่ (บีบอัด/เลิกบีบอัด) ข้อมูลอย่างมีเลือกสรร
  • การถอดรหัสรูปภาพ: การส่งผ่านสตรีมการตอบกลับ HTTP ผ่านสตรีมการเปลี่ยนรูปแบบซึ่งถอดรหัสไบต์เป็นข้อมูลบิตแมป จากนั้นส่งผ่านสตรีมการเปลี่ยนรูปแบบอีกรายการหนึ่งซึ่งแปลบิตแมปเป็น PNG หากติดตั้งภายในตัวแฮนเดิล fetch ของ Service Worker จะช่วยให้คุณโพลีฟีลรูปแบบรูปภาพใหม่อย่าง AVIF ได้อย่างชัดเจน

การสนับสนุนเบราว์เซอร์

ReadableStream และ WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

แนวคิดหลัก

ก่อนจะลงรายละเอียดเกี่ยวกับสตรีมประเภทต่างๆ เราขอแนะนำแนวคิดหลักๆ สักเล็กน้อย

ชิ้นส่วน

ข้อมูลเป็นข้อมูลชิ้นเดียวที่เขียนไปยังหรืออ่านจากสตรีม โดยอาจเป็นประเภทใดก็ได้ สตรีมอาจมีข้อมูลหลายประเภทรวมอยู่ด้วย ส่วนใหญ่แล้ว ข้อมูลส่วนนี้จะไม่ใช่หน่วยข้อมูลย่อยที่สุดสำหรับสตรีมหนึ่งๆ เช่น สตรีมไบต์อาจมีกลุ่มที่ประกอบด้วยหน่วย 16 กิไบต์ Uint8Array แทนที่จะเป็นไบต์เดี่ยว

สตรีมที่อ่านได้

สตรีมที่อ่านได้แสดงแหล่งข้อมูลที่คุณอ่านได้ กล่าวคือ ข้อมูลมาจากสตรีมที่อ่านได้ กล่าวอย่างเป็นรูปธรรมคือ สตรีมที่อ่านได้คืออินสแตนซ์ของReadableStream คลาส

สตรีมที่ใช้เขียนได้

สตรีมที่เขียนได้แสดงปลายทางสำหรับข้อมูลที่คุณสามารถเขียนได้ กล่าวคือ ข้อมูลจะเข้าไปในสตรีมที่เขียนได้ กล่าวโดยละเอียดคือ สตรีมแบบเขียนได้คืออินสแตนซ์ของคลาส WritableStream

เปลี่ยนรูปแบบสตรีม

สตรีมการเปลี่ยนรูปแบบประกอบด้วยสตรีม 2 รายการ ได้แก่ สตรีมที่เขียนได้ ซึ่งเรียกว่าฝั่งที่เขียนได้ และสตรีมที่อ่านได้ ซึ่งเรียกว่าฝั่งที่อ่านได้ อุปมาชีวิตจริงสำหรับฟีเจอร์นี้ก็คือล่ามแบบแปลพร้อมกันซึ่งแปลจากภาษาหนึ่งเป็นภาษาอื่นขณะที่กำลังพูด ในกรณีที่เฉพาะสตรีมการเปลี่ยนรูปแบบ การเขียนไปยังฝั่งที่เขียนได้จะทำให้ข้อมูลใหม่พร้อมใช้งานสำหรับการอ่านจากฝั่งที่อ่านได้ กล่าวโดยละเอียดคือ ออบเจ็กต์ใดก็ตามที่มีพร็อพเพอร์ตี้ writable และพร็อพเพอร์ตี้ readable สามารถทำหน้าที่เป็นสตรีมการเปลี่ยนรูปแบบได้ อย่างไรก็ตาม คลาส TransformStream มาตรฐานช่วยให้สร้างคู่ดังกล่าวที่พันกันได้อย่างถูกต้องได้ง่ายขึ้น

โซ่ท่อ

สตรีมส่วนใหญ่จะใช้เพื่อส่งผ่านไปยังสตรีมอื่น สตรีมที่อ่านได้สามารถส่งผ่านไปยังสตรีมที่เขียนได้โดยตรงโดยใช้เมธอด pipeTo() ของสตรีมที่อ่านได้ หรือจะส่งผ่านสตรีมการเปลี่ยนรูปแบบอย่างน้อย 1 รายการก่อนก็ได้โดยใช้เมธอด pipeThrough() ของสตรีมที่อ่านได้ ชุดสตรีมที่จัดเรียงต่อกันด้วยวิธีนี้เรียกว่า "เชนไปป์"

แรงดันย้อนกลับ

เมื่อสร้างเชนไปป์แล้ว เชนจะส่งสัญญาณเกี่ยวกับความเร็วที่ควรส่งผ่านข้อมูลผ่านเชน หากขั้นตอนใดในเชนยังไม่สามารถรับข้อมูลโค้ดได้ ระบบจะส่งสัญญาณย้อนกลับผ่านเชนไปเรื่อยๆ จนกว่าระบบจะบอกแหล่งที่มาเดิมให้หยุดสร้างข้อมูลโค้ดเร็วเกินไป กระบวนการปรับกระแสให้เป็นไปตามปกตินี้เรียกว่าแรงดันย้อนกลับ

การทำที

สตรีมที่อ่านได้สามารถแยกออกเป็น 2 ทาง (ตั้งชื่อตามรูปร่างของ "T" ตัวพิมพ์ใหญ่) โดยใช้เมธอด tee() ซึ่งจะล็อกสตรีม นั่นคือทำให้ใช้งานโดยตรงไม่ได้อีกต่อไป แต่ระบบจะสร้างสตรีมใหม่ 2 รายการที่เรียกว่าสาขา ซึ่งสามารถใช้งานแยกกันได้ การเริ่มเล่นยังมีความสำคัญด้วยเนื่องจากสตรีมจะกรอกลับหรือเริ่มเล่นใหม่ไม่ได้ เราจะอธิบายเรื่องนี้เพิ่มเติมในภายหลัง

แผนภาพเชนไปป์ประกอบด้วยสตรีมที่อ่านได้ซึ่งมาจากการเรียกใช้ fetch API จากนั้นระบบจะส่งผ่านสตรีมการเปลี่ยนรูปแบบที่มีเอาต์พุตแยกไปให้เบราว์เซอร์สำหรับสตรีมที่อ่านได้ซึ่งแสดงผลเป็นอันดับแรก และส่งไปยังแคช Service Worker สำหรับสตรีมที่อ่านได้ซึ่งแสดงผลเป็นอันดับที่ 2
โซ่ท่อ

กลไกของสตรีมที่อ่านได้

สตรีมที่อ่านได้คือแหล่งข้อมูลที่แสดงใน JavaScript โดยออบเจ็กต์ ReadableStream ที่มาจากแหล่งที่มาพื้นฐาน ตัวสร้างของ ReadableStream() จะสร้างและแสดงผลออบเจ็กต์สตรีมที่อ่านได้จากตัวแฮนเดิลที่ระบุ แหล่งที่มาพื้นฐานมี 2 ประเภท ได้แก่

  • แหล่งข้อมูล Push จะส่งข้อมูลให้คุณอย่างต่อเนื่องเมื่อคุณเข้าถึงแหล่งข้อมูลดังกล่าว และคุณเป็นผู้เลือกว่าจะเริ่ม หยุดชั่วคราว หรือยกเลิกการเข้าถึงสตรีม ตัวอย่างเช่น สตรีมวิดีโอสด เหตุการณ์ที่เซิร์ฟเวอร์ส่ง หรือ WebSocket
  • แหล่งที่มาแบบพุลกำหนดให้คุณขอข้อมูลจากแหล่งที่มาอย่างชัดเจนเมื่อเชื่อมต่อแล้ว ตัวอย่าง ได้แก่ การดำเนินการ HTTP ผ่านการเรียก fetch() หรือ XMLHttpRequest

ระบบจะอ่านข้อมูลสตรีมตามลําดับเป็นชิ้นเล็กๆ ที่เรียกว่าข้อมูลส่วน ข้อมูลโค้ดที่วางในสตรีมจะเรียกว่าจัดคิว ซึ่งหมายความว่าคำสั่งซื้อดังกล่าวกำลังรออยู่ในคิวพร้อมที่จะได้รับการอ่าน คิวภายในจะติดตามข้อมูลส่วนที่ยังไม่ได้อ่าน

กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กําหนดวิธีที่สตรีมควรส่งสัญญาณแรงดันย้อนกลับตามสถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละกลุ่ม และเปรียบเทียบขนาดทั้งหมดของกลุ่มที่อยู่ในคิวกับจำนวนที่ระบุ ซึ่งเรียกว่าจำนวนสูงสุด

โปรแกรมอ่านจะอ่านข้อมูลในสตรีบ เครื่องอ่านนี้จะดึงข้อมูลทีละกลุ่ม ซึ่งช่วยให้คุณดำเนินการใดๆ กับข้อมูลได้ เครื่องอ่านและโค้ดการประมวลผลอื่นๆ ที่มาพร้อมกับเครื่องอ่านเรียกว่าผู้บริโภค

คอนสตรัคต์ถัดไปในบริบทนี้เรียกว่า คอนโทรลเลอร์ สตรีมที่อ่านได้แต่ละรายการจะมีตัวควบคุมที่เชื่อมโยงกัน ซึ่งช่วยให้คุณควบคุมสตรีมได้ดังชื่อที่ระบุ

มีเพียงผู้อ่านรายเดียวเท่านั้นที่อ่านสตรีมได้พร้อมกัน เมื่อสร้างผู้อ่านและเริ่มอ่านสตรีม (นั่นคือ กลายเป็นผู้อ่านที่ใช้งานอยู่) ระบบจะล็อกผู้อ่านรายนั้นไว้กับสตรีมนั้น หากต้องการให้ผู้อ่านคนอื่นอ่านสตรีมต่อ โดยปกติแล้วคุณจะต้องปล่อยผู้อ่านคนแรกก่อนดำเนินการอื่นๆ (แม้ว่าจะส่งต่อสตรีมได้ก็ตาม)

การสร้างสตรีมที่อ่านได้

คุณสร้างสตรีมที่อ่านได้โดยการเรียกคอนสตรัคเตอร์ของมัน ReadableStream() ตัวสร้างคอนสตรัคเตอร์มีพารามิเตอร์ underlyingSource ที่ไม่บังคับ ซึ่งแสดงถึงออบเจ็กต์ที่มีเมธอดและพร็อพเพอร์ตี้ซึ่งกำหนดลักษณะการทํางานของอินสแตนซ์สตรีมที่สร้างขึ้น

underlyingSource

ซึ่งจะใช้วิธีการต่อไปนี้ซึ่งนักพัฒนาแอปกำหนดได้

  • start(controller): เรียกใช้ทันทีเมื่อมีการสร้างออบเจ็กต์ วิธีการนี้สามารถเข้าถึงแหล่งที่มาของสตรีมและทำสิ่งอื่นๆ ที่จำเป็นในการตั้งค่าฟังก์ชันการทำงานของสตรีม หากกระบวนการนี้ทำงานแบบไม่พร้อมกัน เมธอดจะแสดงผลลัพธ์เพื่อบ่งบอกความสําเร็จหรือความล้มเหลวได้ พารามิเตอร์ controller ที่ส่งไปยังเมธอดนี้คือ a ReadableStreamDefaultController
  • pull(controller): ใช้เพื่อควบคุมสตรีมเมื่อมีการดึงข้อมูลกลุ่มเพิ่มเติม ระบบจะเรียกใช้ซ้ำๆ ตราบใดที่คิวภายในของกลุ่มของข้อมูลสตรีมยังไม่เต็ม จนกว่าคิวจะถึงจุดสูงสุด หากผลลัพธ์ของการเรียก pull() เป็นสัญญา ระบบจะไม่เรียก pull() อีกจนกว่าสัญญาดังกล่าวจะสำเร็จ หาก Promise ปฏิเสธ สตรีมจะแสดงข้อผิดพลาด
  • cancel(reason): เรียกใช้เมื่อผู้บริโภคสตรีมยกเลิกสตรีม
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController รองรับวิธีการต่อไปนี้

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

อาร์กิวเมนต์ที่ 2 ของคอนสตรคเตอร์ ReadableStream() ซึ่งไม่บังคับเช่นกันคือ queuingStrategy ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสําหรับสตรีม (ไม่บังคับ) ซึ่งใช้พารามิเตอร์ 2 รายการ ได้แก่

  • highWaterMark: ตัวเลขที่ไม่ใช่ค่าลบซึ่งระบุจำนวนสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้
  • size(chunk): ฟังก์ชันที่คํานวณและแสดงผลขนาดที่ไม่ใช่ค่าลบแบบจํากัดของค่าข้อมูลในรายการที่ระบุ ระบบจะใช้ผลลัพธ์นี้เพื่อกำหนดแรงดันย้อนกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ ReadableStreamDefaultController.desiredSize ที่เหมาะสม และยังควบคุมเวลาที่เรียกใช้เมธอด pull() ของแหล่งที่มาพื้นฐานด้วย
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

วิธีการ getReader() และ read()

หากต้องการอ่านจากสตรีมที่อ่านได้ คุณต้องมีโปรแกรมอ่าน ซึ่งจะเป็น ReadableStreamDefaultReader เมธอด getReader() ของอินเทอร์เฟซ ReadableStream จะสร้างโปรแกรมอ่านและล็อกสตรีมไว้กับโปรแกรมอ่าน ขณะที่สตรีมล็อกอยู่ คุณจะไม่สามารถรับผู้อ่านรายอื่นได้จนกว่าจะปล่อยผู้อ่านรายนี้

เมธอด read() ของอินเทอร์เฟซ ReadableStreamDefaultReader จะแสดงผลพรอมต์ที่ให้สิทธิ์เข้าถึงข้อมูลส่วนถัดไปในคิวภายในของสตรีม โดยจะยอมรับหรือปฏิเสธโดยขึ้นอยู่กับสถานะของสตรีม โอกาสต่างๆ มีดังนี้

  • หากมีข้อมูลโค้ดพร้อมใช้งาน ระบบจะดำเนินการตามสัญญาด้วยออบเจ็กต์ของรูปแบบ
    { value: chunk, done: false }
  • หากสตรีมปิดอยู่ ระบบจะดำเนินการตามสัญญาด้วยออบเจ็กต์ของรูปแบบ
    { value: undefined, done: true }
  • หากสตรีมเกิดข้อผิดพลาด ระบบจะปฏิเสธการสัญญาพร้อมข้อผิดพลาดที่เกี่ยวข้อง
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

พร็อพเพอร์ตี้ locked

คุณสามารถตรวจสอบว่าสตรีมที่อ่านได้ถูกล็อกหรือไม่โดยไปที่พร็อพเพอร์ตี้ ReadableStream.locked ของสตรีมนั้น

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

ตัวอย่างโค้ดสตรีมที่อ่านได้

ตัวอย่างโค้ดด้านล่างแสดงขั้นตอนทั้งหมดที่ใช้งาน ก่อนอื่นให้สร้าง ReadableStream ที่กําหนดวิธีการ start() ในอาร์กิวเมนต์ underlyingSource (นั่นคือคลาส TimestampSource) วิธีนี้บอก controller ของสตรีมให้ประทับเวลาทุกวินาทีเป็นเวลา 10 วินาทีenqueue() สุดท้าย อุปกรณ์จะบอกให้ตัวควบคุมclose()สตรีม คุณใช้สตรีมนี้ได้โดยสร้างโปรแกรมอ่านผ่านเมธอด getReader() และเรียกใช้ read() จนกว่าสตรีมจะdone

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

การทำซ้ำแบบอะซิงโครนัส

การตรวจสอบว่าสตรีมเป็น done หรือไม่ในแต่ละรอบของ read() อาจไม่ใช่ API ที่สะดวกที่สุด แต่โชคดีที่อีกไม่นานจะมีวิธีที่ดีกว่านี้ในการดำเนินการนี้ ซึ่งก็คือการทำซ้ำแบบไม่พร้อมกัน

for await (const chunk of stream) {
  console.log(chunk);
}
การวนซ้ำแบบอะซิงโครนัส

วิธีแก้ปัญหาชั่วคราวในการใช้การทำซ้ำแบบอะซิงโครนัสในปัจจุบันคือการใช้ลักษณะการทำงานด้วย polyfill

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

การสร้างสตรีมที่อ่านได้

เมธอด tee() ของอินเทอร์เฟซ ReadableStream จะแยกสตรีมที่อ่านได้ปัจจุบันออก โดยจะแสดงผลอาร์เรย์ 2 องค์ประกอบที่มีสาขาที่เป็นผลลัพธ์ 2 รายการเป็นอินสแตนซ์ ReadableStream ใหม่ ซึ่งจะช่วยให้ผู้อ่าน 2 คนอ่านสตรีมพร้อมกันได้ คุณอาจทำเช่นนี้ใน Service Worker ในกรณีที่ต้องการดึงข้อมูลการตอบกลับจากเซิร์ฟเวอร์และสตรีมไปยังเบราว์เซอร์ รวมถึงสตรีมไปยังแคช Service Worker ด้วย เนื่องจากใช้เนื้อหาการตอบกลับได้เพียงครั้งเดียว คุณจึงต้องใช้สำเนา 2 รายการเพื่อทำเช่นนี้ หากต้องการยกเลิกสตรีม คุณจะต้องยกเลิกทั้ง 2 สาขาที่เกิดขึ้น โดยทั่วไปแล้ว การเปิดใช้สตรีมจะล็อกสตรีมนั้นไว้ตลอดระยะเวลาที่เปิดใช้ ซึ่งจะป้องกันไม่ให้ผู้อ่านรายอื่นล็อกสตรีมได้

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

สตรีมไบต์ที่อ่านได้

สําหรับสตรีมที่แสดงไบต์ ระบบจะจัดเตรียมสตรีมที่อ่านได้เวอร์ชันขยายเพื่อจัดการไบต์อย่างมีประสิทธิภาพ โดยเฉพาะอย่างยิ่งด้วยการลดการคัดลอก สตรีมไบต์ช่วยให้สามารถรับผู้อ่านแบบนำบัฟเฟอร์ของคุณเอง (BYOB) การใช้งานเริ่มต้นจะให้เอาต์พุตที่หลากหลาย เช่น สตริงหรือบัฟเฟอร์อาร์เรย์ในกรณีของ WebSockets ส่วนสตรีมไบต์จะรับประกันเอาต์พุตไบต์ นอกจากนี้ ผู้อ่าน BYOB ยังได้รับประโยชน์ด้านความเสถียรด้วย เนื่องจากหากบัฟเฟอร์แยกออก การดำเนินการนี้จะรับประกันว่าไม่มีการเขียนลงในบัฟเฟอร์เดียวกัน 2 ครั้ง จึงหลีกเลี่ยงเงื่อนไขการแข่งขันได้ เครื่องอ่าน BYOB สามารถลดจำนวนครั้งที่เบราว์เซอร์ต้องเรียกใช้การเก็บขยะได้ เนื่องจากสามารถนําบัฟเฟอร์มาใช้ซ้ำได้

การสร้างสตรีมไบต์ที่อ่านได้

คุณสามารถสร้างสตรีมไบต์ที่อ่านได้โดยการส่งพารามิเตอร์ type เพิ่มเติมไปยังคอนสตรคเตอร์ ReadableStream()

new ReadableStream({ type: 'bytes' });

underlyingSource

แหล่งที่มาของไบต์สตรีมที่อ่านได้จะได้รับ ReadableByteStreamController เพื่อใช้ดำเนินการ เมธอด ReadableByteStreamController.enqueue() ของคลาสนี้ใช้อาร์กิวเมนต์ chunk ที่มีค่าเป็น ArrayBufferView พร็อพเพอร์ตี้ ReadableByteStreamController.byobRequest จะแสดงคำขอดึง BYOB ในปัจจุบัน หรือแสดงค่าว่างหากไม่มี สุดท้าย พร็อพเพอร์ตี้ ReadableByteStreamController.desiredSize จะแสดงผลขนาดที่ต้องการเพื่อเติมคิวภายในของสตรีมที่ควบคุม

queuingStrategy

อาร์กิวเมนต์ที่ 2 ของคอนสตรัคเตอร์ ReadableStream() ซึ่งไม่บังคับเช่นกันคือ queuingStrategy ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสําหรับสตรีม (ไม่บังคับ) ซึ่งใช้พารามิเตอร์เดียว ดังนี้

  • highWaterMark: จำนวนไบต์ที่ไม่ใช่ค่าลบซึ่งระบุจำนวนสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้ ข้อมูลนี้ใช้เพื่อกำหนดแรงดันย้อนกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ ReadableByteStreamController.desiredSize ที่เหมาะสม และยังควบคุมเวลาที่เรียกใช้เมธอด pull() ของแหล่งที่มาพื้นฐานด้วย

getReader() และread()

จากนั้นคุณจะได้รับสิทธิ์เข้าถึง ReadableStreamBYOBReader โดยการตั้งค่าพารามิเตอร์ mode ดังนี้ ReadableStream.getReader({ mode: "byob" }) วิธีนี้ช่วยให้ควบคุมการจัดสรรบัฟเฟอร์ได้แม่นยำยิ่งขึ้นเพื่อหลีกเลี่ยงการคัดลอก หากต้องการอ่านจากสตรีมไบต์ คุณต้องเรียกใช้ ReadableStreamBYOBReader.read(view) โดยที่ view คือ ArrayBufferView

ตัวอย่างโค้ดสตรีมไบต์ที่อ่านได้

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

ฟังก์ชันต่อไปนี้จะแสดงผลสตรีมไบต์ที่อ่านได้ ซึ่งช่วยให้อ่านอาร์เรย์ที่สร้างขึ้นแบบสุ่มแบบไม่ทำสำเนาได้อย่างมีประสิทธิภาพ แทนที่จะใช้ขนาดข้อมูลเป็นก้อนที่กําหนดไว้ล่วงหน้า 1,024 ข้อมูลจะพยายามเติมบัฟเฟอร์ที่นักพัฒนาแอประบุ ซึ่งช่วยให้ควบคุมได้อย่างเต็มที่

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

กลไกของสตรีมที่เขียนได้

สตรีมที่เขียนได้คือปลายทางที่คุณเขียนข้อมูลได้ ซึ่งแสดงใน JavaScript โดยออบเจ็กต์ WritableStream ซึ่งทำหน้าที่เป็นแอบสแตรกชันเหนือซิงค์ที่อยู่เบื้องหลัง ซึ่งเป็นซิงค์ I/O ระดับล่างที่เขียนข้อมูลดิบ

ระบบจะเขียนข้อมูลลงในสตรีมผ่านโปรแกรมเขียนทีละกลุ่ม ข้อมูลโค้ดมีได้หลายรูปแบบ เช่นเดียวกับข้อมูลโค้ดในโปรแกรมอ่าน คุณใช้โค้ดใดก็ได้เพื่อสร้างข้อมูลที่จะเขียนได้ โดยโปรแกรมเขียนโค้ดและโค้ดที่เกี่ยวข้องเรียกว่าโปรแกรมสร้าง

เมื่อสร้างโปรแกรมเขียนและเริ่มเขียนลงในสตรีม (โปรแกรมเขียนที่ทำงานอยู่) ระบบจะถือว่าโปรแกรมเขียนนั้นล็อกอยู่กับสตรีม มีเพียงผู้เขียนคนเดียวเท่านั้นที่เขียนลงในสตรีมแบบเขียนได้พร้อมกันได้ หากต้องการให้ผู้เขียนคนอื่นๆ เริ่มเขียนในสตรีมของคุณ โดยทั่วไปคุณจะต้องเผยแพร่สตรีมก่อน จากนั้นจึงแนบผู้เขียนคนอื่นๆ ในสตรีม

คิวภายในจะติดตามข้อมูลส่วนที่เขียนลงในสตรีมแล้วแต่ยังไม่ได้ประมวลผลโดยซิงค์พื้นฐาน

กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กําหนดวิธีที่สตรีมควรส่งสัญญาณแรงดันย้อนกลับตามสถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละกลุ่ม และเปรียบเทียบขนาดทั้งหมดของกลุ่มที่อยู่ในคิวกับจำนวนที่ระบุ ซึ่งเรียกว่าจำนวนสูงสุด

โครงสร้างสุดท้ายเรียกว่าตัวควบคุม สตรีมแบบเขียนได้แต่ละรายการจะมีตัวควบคุมที่เชื่อมโยงซึ่งช่วยให้คุณควบคุมสตรีมได้ (เช่น เพื่อหยุดกลางคัน)

การสร้างสตรีมที่เขียนได้

อินเทอร์เฟซ WritableStream ของ Streams API ให้การแยกแยะมาตรฐานสำหรับการเขียนข้อมูลสตรีมไปยังปลายทาง ซึ่งเรียกว่าซิงค์ ออบเจ็กต์นี้มีแรงดันย้อนกลับและการจัดคิวในตัว คุณสร้างสตรีมแบบเขียนได้โดยเรียกคอนสตรัคเตอร์ของ WritableStream() โดยมีพารามิเตอร์ underlyingSink ไม่บังคับ ซึ่งแสดงถึงออบเจ็กต์ที่มีเมธอดและพร็อพเพอร์ตี้ซึ่งกำหนดลักษณะการทํางานของอินสแตนซ์สตรีมที่สร้างขึ้น

underlyingSink

underlyingSink สามารถรวมวิธีการที่นักพัฒนาแอปกำหนดไว้ซึ่งไม่บังคับดังต่อไปนี้ พารามิเตอร์ controller ที่ส่งไปยังเมธอดบางรายการคือ WritableStreamDefaultController

  • start(controller): ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อมีการสร้างออบเจ็กต์ เนื้อหาของเมธอดนี้ควรมีจุดประสงค์เพื่อเข้าถึงซิงค์ที่อยู่เบื้องหลัง หากต้องการดำเนินการนี้แบบไม่พร้อมกัน ระบบจะแสดงผลลัพธ์เพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลว
  • write(chunk, controller): ระบบจะเรียกใช้เมธอดนี้เมื่อข้อมูลกลุ่มใหม่ (ที่ระบุไว้ในพารามิเตอร์ chunk) พร้อมที่จะเขียนลงในซิงค์ที่อยู่เบื้องหลัง การดำเนินการนี้จะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลวของการดำเนินการเขียน ระบบจะเรียกใช้เมธอดนี้หลังจากที่การเขียนก่อนหน้านี้สำเร็จเท่านั้น และจะไม่เรียกใช้หลังจากสตรีมปิดหรือยกเลิก
  • close(controller): ระบบจะเรียกใช้เมธอดนี้หากแอปส่งสัญญาณว่าเขียนข้อมูลไปยังสตรีมเสร็จแล้ว เนื้อหาควรทำสิ่งที่จำเป็นทั้งหมดเพื่อเขียนข้อมูลไปยังที่เก็บข้อมูลย่อยให้เสร็จสมบูรณ์ และยกเลิกสิทธิ์เข้าถึง หากกระบวนการนี้เป็นแบบไม่พร้อมกัน ระบบจะแสดงผลลัพธ์เพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลว ระบบจะเรียกใช้เมธอดนี้หลังจากที่การเขียนทั้งหมดที่อยู่ในคิวดำเนินการเสร็จสมบูรณ์แล้วเท่านั้น
  • abort(reason): ระบบจะเรียกใช้เมธอดนี้หากแอปส่งสัญญาณว่าต้องการปิดสตรีมอย่างกะทันหันและทำให้สตรีมอยู่ในสถานะข้อผิดพลาด การดำเนินการนี้จะล้างทรัพยากรที่ถืออยู่ได้เช่นเดียวกับ close() แต่ระบบจะเรียก abort() แม้จะจัดคิวการเขียนไว้แล้วก็ตาม ระบบจะทิ้งข้อมูลเหล่านั้น หากกระบวนการนี้เป็นแบบไม่พร้อมกัน ระบบจะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลว พารามิเตอร์ reason มี DOMString ที่อธิบายสาเหตุที่หยุดสตรีม
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

อินเทอร์เฟซของ Streams API WritableStreamDefaultController แสดงตัวควบคุมที่อนุญาตให้ควบคุมสถานะของ WritableStream ในระหว่างการตั้งค่า เมื่อส่งข้อมูลจำนวนมากขึ้นเพื่อเขียน หรือเมื่อสิ้นสุดการเขียน เมื่อสร้าง WritableStream ไปป์ไลน์ที่ฝังอยู่จะได้รับอินสแตนซ์ WritableStreamDefaultController ที่เกี่ยวข้องเพื่อดำเนินการ WritableStreamDefaultController มีเพียงเมธอดเดียวเท่านั้น ซึ่งก็คือ WritableStreamDefaultController.error() ซึ่งทําให้การทำงานกับสตรีมที่เกี่ยวข้องในอนาคตเกิดข้อผิดพลาด WritableStreamDefaultController ยังรองรับพร็อพเพอร์ตี้ signal ซึ่งจะแสดงผลอินสแตนซ์ของ AbortSignal ซึ่งช่วยให้หยุดการดำเนินการ WritableStream ได้หากจำเป็น

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

อาร์กิวเมนต์ที่ 2 ของคอนสตรัคเตอร์ WritableStream() ซึ่งไม่บังคับเช่นกันคือ queuingStrategy ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสําหรับสตรีม (ไม่บังคับ) ซึ่งใช้พารามิเตอร์ 2 รายการดังนี้

  • highWaterMark: ตัวเลขที่ไม่ใช่ค่าลบซึ่งระบุจำนวนสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้
  • size(chunk): ฟังก์ชันที่คํานวณและแสดงผลขนาดที่ไม่ใช่ค่าลบแบบจํากัดของค่าข้อมูลในรายการที่ระบุ ระบบจะใช้ผลลัพธ์นี้เพื่อกำหนดแรงดันย้อนกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ WritableStreamDefaultWriter.desiredSize ที่เหมาะสม

วิธีการ getWriter() และ write()

หากต้องการเขียนลงในสตรีมที่เขียนได้ คุณต้องมีโปรแกรมเขียน ซึ่งจะเป็น WritableStreamDefaultWriter เมธอด getWriter() ของอินเทอร์เฟซ WritableStream จะแสดงผลอินสแตนซ์ WritableStreamDefaultWriter ใหม่และล็อกสตรีมไปยังอินสแตนซ์นั้น ขณะที่สตรีมถูกล็อก คุณจะไม่สามารถเพิ่มผู้เขียนคนอื่นๆ ได้จนกว่าจะมีการปล่อยผู้เขียนคนปัจจุบัน

เมธอด write() ของอินเตอร์เฟซ WritableStreamDefaultWriter จะเขียนข้อมูลกลุ่มที่ส่งไปยัง WritableStream และซิงค์ที่อยู่เบื้องหลัง จากนั้นจะแสดงผลลัพธ์เป็นสัญญาที่แสดงถึงความสำเร็จหรือความล้มเหลวของการดำเนินการเขียน โปรดทราบว่าความหมายของ "สำเร็จ" ขึ้นอยู่กับซิงค์ที่อยู่เบื้องหลัง ซึ่งอาจบ่งบอกว่าระบบยอมรับข้อมูลดังกล่าวแล้ว แต่ไม่จําเป็นว่าข้อมูลได้รับการบันทึกอย่างปลอดภัยไปยังปลายทาง

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

พร็อพเพอร์ตี้ locked

คุณสามารถตรวจสอบว่าสตรีมที่ใช้เขียนได้ถูกล็อกหรือไม่โดยไปที่พร็อพเพอร์ตี้ WritableStream.locked ของสตรีมนั้น

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

ตัวอย่างโค้ดสตรีมแบบเขียนได้

ตัวอย่างโค้ดด้านล่างแสดงขั้นตอนทั้งหมดที่ใช้งาน

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

การส่งผ่านสตรีมที่อ่านได้ไปยังสตรีมที่เขียนได้

สตรีมที่อ่านได้สามารถส่งผ่านไปยังสตรีมที่เขียนได้ผ่านเมธอด pipeTo() ของสตรีมที่อ่านได้ ReadableStream.pipeTo() จะส่งผ่าน ReadableStream ในปัจจุบันไปยัง WritableStream ที่ระบุ และแสดงผล Promise ที่สำเร็จเมื่อกระบวนการส่งผ่านเสร็จสมบูรณ์ หรือปฏิเสธหากพบข้อผิดพลาด

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

การสร้างสตรีมการเปลี่ยนรูปแบบ

อินเทอร์เฟซ TransformStream ของ Streams API แสดงชุดข้อมูลที่เปลี่ยนรูปแบบได้ คุณสร้างสตรีมการเปลี่ยนรูปแบบได้โดยเรียกคอนสตรัคเตอร์ TransformStream() ซึ่งจะสร้างและแสดงผลออบเจ็กต์สตรีมการเปลี่ยนรูปแบบจากตัวแฮนเดิลที่ระบุ ตัวสร้าง TransformStream() จะยอมรับออบเจ็กต์ JavaScript ที่ไม่บังคับซึ่งแสดงถึง transformer เป็นอาร์กิวเมนต์แรก ออบเจ็กต์ดังกล่าวอาจมีเมธอดใดก็ได้ต่อไปนี้

transformer

  • start(controller): ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อมีการสร้างออบเจ็กต์ โดยปกติแล้วจะใช้เพื่อจัดคิวกลุ่มคำนำหน้าโดยใช้ controller.enqueue() ระบบจะอ่านข้อมูลดังกล่าวจากฝั่งที่อ่านได้ แต่ไม่ขึ้นอยู่กับการเขียนไปยังฝั่งที่เขียนได้ หากกระบวนการเริ่มต้นนี้เป็นแบบไม่พร้อมกัน เช่น เนื่องจากต้องใช้เวลาในการรับข้อมูลโค้ดนำหน้า ฟังก์ชันจะแสดงผลพรอมต์เพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลวได้ พรอมต์ที่ถูกปฏิเสธจะทำให้สตรีมแสดงข้อผิดพลาด ตัวสร้าง TransformStream() จะโยนข้อยกเว้นที่โยน
  • transform(chunk, controller): ระบบจะเรียกใช้เมธอดนี้เมื่อข้อมูลใหม่ซึ่งเขียนไปยังฝั่งที่เขียนได้พร้อมที่จะเปลี่ยนรูปแบบแล้ว การใช้งานสตรีมรับประกันว่าฟังก์ชันนี้จะเรียกใช้หลังจากที่การแปลงก่อนหน้าเสร็จสมบูรณ์แล้วเท่านั้น และจะไม่เรียกใช้ก่อน start() เสร็จสมบูรณ์หรือหลังจากเรียกใช้ flush() แล้ว ฟังก์ชันนี้จะทํางานเปลี่ยนรูปแบบจริงของสตรีมการเปลี่ยนรูปแบบ ซึ่งสามารถจัดคิวผลลัพธ์ได้โดยใช้ controller.enqueue() ซึ่งอนุญาตให้เขียนข้อมูลไปยังฝั่งที่เขียนได้เพียงครั้งเดียว แต่ฝั่งที่อ่านได้อาจมีข้อมูลเป็น 0 หรือหลายรายการก็ได้ ทั้งนี้ขึ้นอยู่กับจำนวนครั้งที่เรียก controller.enqueue() หากกระบวนการเปลี่ยนรูปแบบเป็นแบบไม่เป็นเชิงเส้น ฟังก์ชันนี้จะแสดงผลพรอมต์เพื่อบ่งบอกความสําเร็จหรือความล้มเหลวของการเปลี่ยนรูปแบบ พรอมต์ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดทั้งฝั่งที่อ่านได้และเขียนได้ของสตรีมการเปลี่ยนรูปแบบ หากไม่ได้ระบุเมธอด transform() ระบบจะใช้การเปลี่ยนรูปแบบข้อมูลระบุตัวตน ซึ่งจะจัดคิวข้อมูลส่วนที่ไม่มีการแก้ไขจากฝั่งที่เขียนได้ไปยังฝั่งที่อ่านได้
  • flush(controller): ระบบจะเรียกใช้เมธอดนี้หลังจากที่แปลงข้อมูลทั้งหมดที่เขียนลงในฝั่งที่เขียนได้ผ่าน transform() เรียบร้อยแล้ว และกำลังจะปิดฝั่งที่เขียนได้ โดยปกติแล้วจะใช้เพื่อจัดคิวกลุ่มส่วนต่อท้ายไปยังฝั่งที่อ่านได้ ก่อนที่ฝั่งนั้นจะปิดลงด้วย หากการล้างข้อมูลเป็นแบบไม่เป็นเชิงเวลา ฟังก์ชันจะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลว และระบบจะสื่อสารผลลัพธ์ไปยังผู้เรียกใช้ stream.writable.write() นอกจากนี้ พรอมต์ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดทั้งฝั่งที่อ่านได้และเขียนได้ของสตรีม การยกเว้นข้อยกเว้นจะถือว่าเหมือนกับการคืนค่า Promise ที่ปฏิเสธ
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

กลยุทธ์การจัดคิว writableStrategy และ readableStrategy

พารามิเตอร์ที่ไม่บังคับลำดับที่ 2 และ 3 ของคอนสตรคเตอร์ TransformStream() จะใช้หรือไม่ก็ได้ writableStrategy และกลยุทธ์การจัดคิว readableStrategy โดยมีการกําหนดไว้ดังที่ระบุไว้ในส่วนสตรีมที่อ่านได้และที่เขียนได้ตามลําดับ

ตัวอย่างโค้ดเปลี่ยนรูปแบบสตรีม

ตัวอย่างโค้ดต่อไปนี้แสดงสตรีมการเปลี่ยนรูปแบบแบบง่ายที่ทำงานอยู่

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

การส่งผ่านสตรีมที่อ่านได้ผ่านสตรีมการเปลี่ยนรูปแบบ

เมธอด pipeThrough() ของอินเทอร์เฟซ ReadableStream มีวิธีต่อเชื่อมสตรีมปัจจุบันผ่านสตรีมการเปลี่ยนรูปแบบหรือคู่อื่นๆ ที่เขียนได้/อ่านได้ โดยทั่วไปแล้ว การส่งผ่านสตรีมจะล็อกสตรีมนั้นไว้ตลอดระยะเวลาการส่งผ่าน ซึ่งจะป้องกันไม่ให้ผู้อ่านรายอื่นล็อกสตรีมได้

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

ตัวอย่างโค้ดถัดไป (ค่อนข้างซับซ้อน) แสดงวิธีใช้ fetch() เวอร์ชัน "ตะโกน" ซึ่งเปลี่ยนข้อความทั้งหมดเป็นตัวพิมพ์ใหญ่โดยใช้ Promise การตอบกลับที่แสดงผลเป็นสตรีม และเปลี่ยนเป็นตัวพิมพ์ใหญ่ทีละส่วน ข้อดีของวิธีการนี้คือคุณไม่จําเป็นต้องรอให้ดาวน์โหลดเอกสารทั้งฉบับ ซึ่งจะทําให้เกิดความแตกต่างอย่างมากเมื่อจัดการกับไฟล์ขนาดใหญ่

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

สาธิต

ตัวอย่างด้านล่างแสดงสตรีมที่อ่านได้ เขียนได้ และเปลี่ยนรูปแบบ รวมถึงมีตัวอย่างโซ่ท่อ pipeThrough() และ pipeTo() รวมถึงสาธิต tee() ด้วย คุณสามารถเลือกที่จะเรียกใช้เดโมในหน้าต่างของตัวเองหรือดูซอร์สโค้ดก็ได้

สตรีมมีประโยชน์ที่พร้อมใช้งานในเบราว์เซอร์

มีสตรีมที่มีประโยชน์หลายรายการที่สร้างไว้ในเบราว์เซอร์ คุณสร้าง ReadableStream จาก Blob ได้ง่ายๆ เมธอด stream() ของอินเทอร์เฟซ Blob จะแสดงผล ReadableStream ซึ่งจะแสดงข้อมูลที่อยู่ใน Blob เมื่ออ่าน นอกจากนี้ โปรดทราบว่าออบเจ็กต์ File เป็น Blob ประเภทหนึ่งๆ และสามารถใช้ในบริบทใดก็ได้ที่ Blob ใช้ได้

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

ตัวแปรสตรีมมิงของ TextDecoder.decode() และ TextEncoder.encode() จะเรียกว่า TextDecoderStream และ TextEncoderStream ตามลำดับ

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

การบีบอัดหรือคลายการบีบอัดไฟล์นั้นทำได้ง่ายโดยใช้สตรีมการเปลี่ยนรูปแบบ CompressionStream และ DecompressionStream ตามลำดับ ตัวอย่างโค้ดด้านล่างแสดงวิธีดาวน์โหลดข้อกำหนดของ Streams, บีบอัด (gzip) ไฟล์ในเบราว์เซอร์โดยตรง และเขียนไฟล์ที่บีบอัดไปยังดิสก์โดยตรง

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream ของ File System Access API และfetch()สตรีมคำขอเวอร์ชันทดลองเป็นตัวอย่างของสตรีมที่เขียนได้ในโลกแห่งความเป็นจริง

Serial API ใช้สตรีมทั้งแบบอ่านได้และเขียนได้

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

สุดท้าย WebSocketStream API จะผสานรวมสตรีมกับ WebSocket API

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

แหล่งข้อมูลที่มีประโยชน์

ขอขอบคุณ

บทความนี้ได้รับการตรวจสอบโดย Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley และ Adam Rice บล็อกโพสต์ของ Jake Archibald ช่วยฉันได้มากในการทําความเข้าใจสตรีม ตัวอย่างโค้ดบางส่วนได้รับแรงบันดาลใจจากการสํารวจของผู้ใช้ GitHub @bellbind และส่วนหนึ่งของข้อความเขียนขึ้นโดยอิงตามเอกสารประกอบเว็บ MDN สําหรับสตรีม ผู้เขียนมาตรฐานสตรีมทํางานได้อย่างยอดเยี่ยมในการเขียนข้อกําหนดนี้ รูปภาพหลักโดย Ryan Lara จาก Unsplash