สตรีม—คู่มือฉบับสมบูรณ์

ดูวิธีใช้สตรีมที่อ่านได้ เขียนได้ และแปลงได้ด้วย Streams API

Streams API ช่วยให้คุณเข้าถึงสตรีมข้อมูลที่ได้รับผ่านเครือข่ายหรือสร้างขึ้นด้วยวิธีใดก็ตามในเครื่อง และประมวลผลด้วย JavaScript แบบเป็นโปรแกรมได้ การสตรีมเกี่ยวข้องกับการแบ่งทรัพยากรที่คุณต้องการรับ ส่ง หรือแปลง เป็นก้อนเล็กๆ แล้วประมวลผลก้อนเหล่านี้ทีละเล็กทีละน้อย แม้ว่าการสตรีมจะเป็นสิ่งที่เบราว์เซอร์ทำอยู่แล้วเมื่อได้รับชิ้นงาน เช่น HTML หรือวิดีโอที่จะแสดงในหน้าเว็บ แต่ความสามารถนี้ก็ไม่เคยมีให้ใช้งานใน JavaScript มาก่อนจนกระทั่งมีการเปิดตัว fetch พร้อมสตรีมในปี 2015

ก่อนหน้านี้ หากต้องการประมวลผลทรัพยากรบางอย่าง (ไม่ว่าจะเป็นวิดีโอ ไฟล์ข้อความ ฯลฯ) คุณจะต้องดาวน์โหลดไฟล์ทั้งหมด รอให้ระบบยกเลิกการซีเรียลไลซ์เป็นรูปแบบที่เหมาะสม แล้วจึงประมวลผล แต่เมื่อสตรีมพร้อมใช้งานใน JavaScript ทุกอย่างก็เปลี่ยนไป ตอนนี้คุณสามารถประมวลผลข้อมูลดิบด้วย JavaScript ได้ทีละรายการทันทีที่ข้อมูลพร้อมใช้งานในไคลเอ็นต์ โดยไม่ต้องสร้างบัฟเฟอร์ สตริง หรือ Blob ซึ่งจะช่วยให้คุณใช้ฟีเจอร์นี้ได้ในหลายกรณี ดังที่ฉันได้ระบุไว้ด้านล่าง

  • เอฟเฟกต์วิดีโอ: การส่งสตรีมวิดีโอที่อ่านได้ผ่านสตรีมการแปลงที่ใช้เอฟเฟกต์ แบบเรียลไทม์
  • (ยกเลิก)การบีบอัดข้อมูล: การส่งสตรีมไฟล์ผ่านสตรีมการแปลงที่(ยกเลิก)การบีบอัดอย่างเลือกสรร
  • การถอดรหัสรูปภาพ: การส่งสตรีมการตอบกลับ HTTP ผ่านสตรีมการแปลงที่ถอดรหัสไบต์ เป็นข้อมูลบิตแมป แล้วส่งผ่านสตรีมการแปลงอีกรายการที่แปลงบิตแมปเป็น PNG หากติดตั้งภายในแฮนเดิล fetch ของ Service Worker คุณจะสามารถ Polyfill รูปแบบรูปภาพใหม่ๆ เช่น AVIF ได้อย่างโปร่งใส

การสนับสนุนเบราว์เซอร์

ReadableStream และ WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

แนวคิดหลัก

ก่อนที่จะลงรายละเอียดเกี่ยวกับสตรีมประเภทต่างๆ เรามาทำความรู้จักแนวคิดหลักบางอย่างกันก่อน

แบบเปียก

ก้อนข้อมูลคือข้อมูลชิ้นเดียวที่เขียนลงในหรืออ่านจากสตรีม โดยสตรีมอาจเป็นประเภทใดก็ได้ หรืออาจมีกลุ่มข้อมูลประเภทต่างๆ ก็ได้ โดยส่วนใหญ่แล้ว Chunk จะไม่ใช่หน่วยข้อมูลที่เล็กที่สุด สำหรับสตรีมที่กำหนด เช่น สตรีมไบต์อาจมีกลุ่มที่ประกอบด้วยหน่วย 16 KiB Uint8Array แทนที่จะเป็นไบต์เดียว

สตรีมที่อ่านได้

สตรีมที่อ่านได้แสดงถึงแหล่งข้อมูลที่คุณอ่านได้ กล่าวคือ ข้อมูลออกมาจากสตรีมที่อ่านได้ กล่าวอย่างเจาะจงคือ สตรีมที่อ่านได้คืออินสแตนซ์ของคลาส ReadableStream

สตรีมที่เขียนได้

สตรีมที่เขียนได้แสดงถึงปลายทางของข้อมูลที่คุณเขียนได้ กล่าวคือ ข้อมูล จะเข้าไปในสตรีมที่เขียนได้ กล่าวอย่างเจาะจงคือสตรีมที่เขียนได้คืออินสแตนซ์ของคลาส WritableStream

เปลี่ยนรูปแบบสตรีม

สตรีมการแปลงประกอบด้วยสตรีม 2 รายการ ได้แก่ สตรีมที่เขียนได้ ซึ่งเรียกว่าด้านที่เขียนได้ และสตรีมที่อ่านได้ ซึ่งเรียกว่าด้านที่อ่านได้ คำอุปมาในชีวิตจริงสำหรับเรื่องนี้คือล่ามแปลพร้อม ที่แปลจากภาษาหนึ่งเป็นอีกภาษาหนึ่งในทันที ในลักษณะที่เฉพาะเจาะจงกับสตรีมการแปลง การเขียนไปยังด้านที่เขียนได้จะทําให้ข้อมูลใหม่พร้อมให้อ่านจากด้านที่อ่านได้ กล่าวอย่างเจาะจงคือ ออบเจ็กต์ใดก็ตามที่มีพร็อพเพอร์ตี้ writable และพร็อพเพอร์ตี้ readable จะทำหน้าที่เป็นสตรีมการเปลี่ยนรูปแบบได้ อย่างไรก็ตาม TransformStream คลาสมาตรฐานจะช่วยให้สร้าง คู่ดังกล่าวที่พันกันอย่างเหมาะสมได้ง่ายขึ้น

โซ่ท่อ

โดยจะใช้สตรีมเป็นหลักด้วยการส่งผ่านสตรีมไปยังกันและกัน คุณสามารถส่งสตรีมที่อ่านได้ไปยังสตรีมที่เขียนได้โดยตรง โดยใช้เมธอด pipeTo() ของสตรีมที่อ่านได้ หรือจะส่งผ่านสตรีมการแปลงอย่างน้อย 1 รายการก่อนก็ได้โดยใช้เมธอด pipeThrough() ของสตรีมที่อ่านได้ ชุดของสตรีมที่เชื่อมต่อกันในลักษณะนี้เรียกว่าเชนไปป์

แรงดันย้อนกลับ

เมื่อสร้างไปป์เชนแล้ว ระบบจะเผยแพร่สัญญาณเกี่ยวกับความเร็วที่ควรส่งผ่านก้อนข้อมูล ผ่านไปป์เชน หากขั้นตอนใดในห่วงโซ่ยังรับก้อนข้อมูลไม่ได้ ระบบจะส่งสัญญาณย้อนกลับ ผ่านห่วงโซ่ไปป์ จนกว่าในที่สุดแหล่งที่มาต้นทางจะได้รับแจ้งให้หยุดสร้างก้อนข้อมูลเร็ว ขนาดนั้น กระบวนการการทำให้การไหลเป็นปกตินี้เรียกว่าการควบคุมอัตราการส่ง

การทีออฟ

คุณสามารถแยกสตรีมที่อ่านได้ (ตั้งชื่อตามรูปร่างของตัว "T" พิมพ์ใหญ่) โดยใช้เมธอด tee() การดำเนินการนี้จะล็อกสตรีม ซึ่งหมายความว่าสตรีมจะใช้โดยตรงไม่ได้อีกต่อไป แต่จะสร้างสตรีมใหม่ 2 รายการที่เรียกว่า "สาขา" ซึ่งสามารถใช้แยกกันได้ การทีออฟยังมีความสำคัญเนื่องจากสตรีมไม่สามารถกรอหรือรีสตาร์ทได้ เราจะพูดถึงเรื่องนี้ในภายหลัง

แผนภาพของห่วงโซ่ไปป์ที่ประกอบด้วยสตรีมที่อ่านได้ซึ่งมาจากการเรียกใช้ Fetch API จากนั้นจะส่งผ่านสตรีมการแปลงที่มีเอาต์พุตเป็นแบบที แล้วส่งไปยังเบราว์เซอร์สำหรับสตรีมที่อ่านได้แรกที่ได้ และไปยังแคชของ Service Worker สำหรับสตรีมที่อ่านได้ที่ 2 ที่ได้
เชนไปป์

กลไกของสตรีมที่อ่านได้

สตรีมที่อ่านได้คือแหล่งข้อมูลที่แสดงใน JavaScript โดยออบเจ็กต์ ReadableStream ซึ่งไหลจากแหล่งที่มาพื้นฐาน ตัวสร้าง ReadableStream() จะสร้างและแสดงผลออบเจ็กต์สตรีมที่อ่านได้จากแฮนเดิลที่ระบุ แหล่งที่มาพื้นฐานมี 2 ประเภท ได้แก่

  • แหล่งที่มาแบบพุชจะพุชข้อมูลให้คุณอย่างต่อเนื่องเมื่อคุณเข้าถึงแหล่งที่มานั้น และคุณเป็นผู้มีสิทธิ์ เริ่ม หยุดชั่วคราว หรือยกเลิกการเข้าถึงสตรีม ตัวอย่างเช่น สตรีมวิดีโอสด เหตุการณ์ที่เซิร์ฟเวอร์ส่ง หรือ WebSocket
  • แหล่งที่มาแบบดึงกำหนดให้คุณต้องขอข้อมูลจากแหล่งที่มาอย่างชัดเจนเมื่อเชื่อมต่อแล้ว ตัวอย่าง ได้แก่ การดำเนินการ HTTP ผ่านการเรียกใช้ fetch() หรือ XMLHttpRequest

ระบบจะอ่านข้อมูลสตรีมตามลำดับเป็นชิ้นเล็กๆ ที่เรียกว่าก้อน โดยก้อนข้อมูลที่วางในสตรีมจะเรียกว่าจัดคิว ซึ่งหมายความว่าข้อความเหล่านั้นกำลังรออยู่ในคิว พร้อมให้คุณอ่าน คิวภายในจะติดตามก้อนข้อมูลที่ยังไม่ได้อ่าน

กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กำหนดวิธีที่สตรีมควรส่งสัญญาณแรงดันย้อนกลับตาม สถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละก้อน และเปรียบเทียบ ขนาดรวมของก้อนทั้งหมดในคิวกับตัวเลขที่ระบุ ซึ่งเรียกว่าเครื่องหมายระดับสูง

โปรแกรมอ่านจะอ่านก้อนข้อมูลภายในสตรีม โปรแกรมอ่านนี้จะดึงข้อมูลทีละก้อน ทำให้คุณดำเนินการใดๆ กับข้อมูลได้ตามต้องการ โปรแกรมอ่านและโค้ดประมวลผลอื่นๆ ที่มาพร้อมกับโปรแกรมอ่านเรียกว่าผู้ใช้

โครงสร้างถัดไปในบริบทนี้เรียกว่าคอนโทรลเลอร์ สตรีมที่อ่านได้แต่ละรายการจะมีตัวควบคุมที่เชื่อมโยงอยู่ ซึ่งช่วยให้คุณควบคุมสตรีมได้ตามชื่อ

มีผู้อ่านได้ครั้งละ 1 คนเท่านั้น เมื่อสร้างผู้อ่านและเริ่มอ่านสตรีม (กล่าวคือ กลายเป็นผู้อ่านที่ใช้งานอยู่) ระบบจะล็อกผู้อ่านนั้น หากต้องการให้ผู้อ่านคนอื่นอ่านสตรีมของคุณ คุณมักจะต้องปล่อยผู้อ่านคนแรกก่อนที่จะทำอย่างอื่น (แม้ว่าคุณจะแยกสตรีมได้ก็ตาม)

การสร้างสตรีมที่อ่านได้

คุณสร้างสตรีมที่อ่านได้โดยการเรียกตัวสร้างของสตรีมนั้น ReadableStream() ตัวสร้างมีอาร์กิวเมนต์ที่ไม่บังคับ underlyingSource ซึ่งแสดงถึงออบเจ็กต์ ที่มีเมธอดและพร็อพเพอร์ตี้ที่กำหนดลักษณะการทำงานของอินสแตนซ์สตรีมที่สร้างขึ้น

underlyingSource

ซึ่งสามารถใช้วิธีการต่อไปนี้ที่นักพัฒนาแอปกำหนดได้ (ไม่บังคับ)

  • start(controller): เรียกใช้ทันทีเมื่อสร้างออบเจ็กต์ เมธอดสามารถเข้าถึงแหล่งที่มาของสตรีมและทำสิ่งอื่นๆ ที่จำเป็นในการตั้งค่าฟังก์ชันการทำงานของสตรีม หากต้องการดำเนินการกระบวนการนี้แบบไม่พร้อมกัน เมธอดจะ ส่งคืน Promise เพื่อส่งสัญญาณว่าสำเร็จหรือไม่สำเร็จ พารามิเตอร์ controller ที่ส่งไปยังเมธอดนี้คือ a ReadableStreamDefaultController
  • pull(controller): ใช้เพื่อควบคุมสตรีมเมื่อมีการดึงข้อมูลก้อนข้อมูลเพิ่มเติม ระบบจะเรียกใช้ฟังก์ชันนี้ซ้ำๆ ตราบใดที่คิวภายในของสตรีมยังไม่เต็ม จนกว่าคิวจะถึงเครื่องหมายระดับน้ำสูงสุด หากผลลัพธ์ของการเรียกใช้ pull() เป็น Promise ระบบจะไม่เรียกใช้ pull() อีกจนกว่า Promise ดังกล่าวจะดำเนินการเสร็จสมบูรณ์ หาก Promise ปฏิเสธ สตรีมจะเกิดข้อผิดพลาด
  • cancel(reason): เรียกใช้เมื่อผู้ใช้สตรีมยกเลิกสตรีม
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController รองรับวิธีการต่อไปนี้

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

อาร์กิวเมนต์ที่ 2 ของตัวสร้าง ReadableStream() ซึ่งจะใส่หรือไม่ใส่ก็ได้คือ queuingStrategy เป็นออบเจ็กต์ที่กําหนดกลยุทธ์การจัดคิวสําหรับสตรีมได้ (ไม่บังคับ) ซึ่งมีพารามิเตอร์ 2 รายการดังนี้

  • highWaterMark: ตัวเลขที่ไม่เป็นลบซึ่งระบุเครื่องหมายสูงสุดของสตรีมโดยใช้กลยุทธ์การจัดคิวนี้
  • size(chunk): ฟังก์ชันที่คำนวณและแสดงผลขนาดที่ไม่เป็นลบแบบจำกัดของค่าก้อนที่ระบุ ระบบจะใช้ผลลัพธ์เพื่อกำหนดแรงดันย้อนกลับ ซึ่งจะแสดงผ่านพร็อพเพอร์ตี้ ReadableStreamDefaultController.desiredSize ที่เหมาะสม นอกจากนี้ยังควบคุมเวลาที่เรียกใช้เมธอด pull() ของแหล่งที่มาพื้นฐานด้วย
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

วิธีการ getReader() และ read()

หากต้องการอ่านจากสตรีมที่อ่านได้ คุณต้องมีโปรแกรมอ่านซึ่งจะเป็น ReadableStreamDefaultReader เมธอด getReader() ของอินเทอร์เฟซ ReadableStream จะสร้างโปรแกรมอ่านและล็อกสตรีมไว้ ในขณะที่สตรีมล็อกอยู่ คุณจะซื้อสตรีมอื่นไม่ได้จนกว่าสตรีมนี้จะพร้อมให้รับชม

เมธอด read() ของอินเทอร์เฟซ ReadableStreamDefaultReader จะแสดงผล Promise ที่ให้สิทธิ์เข้าถึง ก้อนข้อมูลถัดไปในคิวภายในของสตรีม โดยจะดำเนินการหรือปฏิเสธพร้อมผลลัพธ์ตามสถานะของสตรีม ความเป็นไปได้ต่างๆ มีดังนี้

  • หากมี Chunk อยู่ ระบบจะทำตามสัญญาด้วยออบเจ็กต์ในรูปแบบ
    { value: chunk, done: false }
  • หากสตรีมปิดลง สัญญาจะได้รับการตอบสนองด้วยออบเจ็กต์ในรูปแบบ
    { value: undefined, done: true }
  • หากสตรีมเกิดข้อผิดพลาด ระบบจะปฏิเสธ Promise พร้อมข้อผิดพลาดที่เกี่ยวข้อง
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

พร็อพเพอร์ตี้ locked

คุณสามารถตรวจสอบว่าสตรีมที่อ่านได้ถูกล็อกหรือไม่โดยเข้าถึงพร็อพเพอร์ตี้ ReadableStream.locked ของสตรีม

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

ตัวอย่างโค้ดสตรีมที่อ่านได้

โค้ดตัวอย่างด้านล่างแสดงขั้นตอนทั้งหมดที่ดำเนินการ คุณต้องสร้าง ReadableStream ก่อน ซึ่งอาร์กิวเมนต์ของ underlyingSource (นั่นคือคลาส TimestampSource) จะกำหนดเมธอด start() วิธีนี้จะบอก controller ของสตรีมให้ enqueue() ประทับเวลาทุกวินาทีเป็นเวลา 10 วินาที สุดท้ายคือบอกให้ตัวควบคุมclose()สตรีม คุณใช้สตรีมนี้ได้โดยการสร้างโปรแกรมอ่านผ่านเมธอด getReader() และเรียกใช้ read() จนกว่าสตรีมจะdone

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

การทำซ้ำแบบอะซิงโครนัส

การตรวจสอบว่าสตรีมเป็น done ในแต่ละread()การวนซ้ำของลูปอาจไม่ใช่ API ที่สะดวกที่สุด โชคดีที่ในเร็วๆ นี้จะมีวิธีที่ดีกว่าในการทำสิ่งนี้ นั่นคือการทำซ้ำแบบไม่พร้อมกัน

for await (const chunk of stream) {
  console.log(chunk);
}

วิธีแก้ปัญหาชั่วคราวในการใช้การทำซ้ำแบบอะซิงโครนัสในปัจจุบันคือการใช้ลักษณะการทำงานกับ Polyfill

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

การแยกสตรีมที่อ่านได้

เมธอด tee() ของอินเทอร์เฟซ ReadableStream จะตั้งค่าสตรีมที่อ่านได้ปัจจุบัน โดยจะแสดงผลอาร์เรย์ที่มี 2 องค์ประกอบ ซึ่งมีกิ่งก้าน 2 กิ่งที่ได้เป็นอินสแตนซ์ ReadableStream ใหม่ ซึ่งจะช่วยให้ ผู้อ่าน 2 คนอ่านสตรีมพร้อมกันได้ คุณอาจทำเช่นนี้ใน Service Worker หากต้องการดึงข้อมูลการตอบกลับจากเซิร์ฟเวอร์และสตรีมไปยังเบราว์เซอร์ แต่ก็สตรีมไปยังแคชของ Service Worker ด้วย เนื่องจากใช้เนื้อหาการตอบกลับมากกว่า 1 ครั้งไม่ได้ คุณจึงต้องมีสำเนา 2 ชุด เพื่อดำเนินการนี้ หากต้องการยกเลิกสตรีม คุณจะต้องยกเลิกทั้ง 2 สาขาที่ได้ โดยทั่วไปแล้ว การเริ่มสตรีมจะล็อกสตรีมนั้นตลอดระยะเวลาที่สตรีมอยู่ ซึ่งจะป้องกันไม่ให้ผู้อ่านคนอื่นๆ ล็อกสตรีม

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

สตรีมไบต์ที่อ่านได้

สำหรับสตรีมที่แสดงไบต์ ระบบจะจัดเตรียมสตรีมที่อ่านได้เวอร์ชันขยายเพื่อจัดการไบต์อย่างมีประสิทธิภาพ โดยเฉพาะอย่างยิ่งด้วยการลดการคัดลอก สตรีมไบต์ช่วยให้ได้ผู้ติดตามที่นำบัฟเฟอร์ของตนเองมาใช้ (BYOB) การติดตั้งใช้งานเริ่มต้นอาจให้เอาต์พุตที่แตกต่างกัน เช่น สตริงหรือบัฟเฟอร์อาร์เรย์ในกรณีของ WebSocket ในขณะที่สตรีมไบต์จะรับประกันเอาต์พุตไบต์ นอกจากนี้ ผู้อ่าน BYOB ยังได้รับประโยชน์ด้านความเสถียรด้วย เนื่องจากหากบัฟเฟอร์แยกออก ก็จะรับประกันได้ว่าไม่มีการเขียนลงในบัฟเฟอร์เดียวกัน 2 ครั้ง จึงหลีกเลี่ยงการแข่งขันได้ โปรแกรมอ่าน BYOB จะช่วยลดจำนวนครั้งที่เบราว์เซอร์ต้องเรียกใช้ การเก็บขยะได้ เนื่องจากสามารถนำบัฟเฟอร์กลับมาใช้ใหม่ได้

การสร้างไบต์สตรีมที่อ่านได้

คุณสร้างสตรีมไบต์ที่อ่านได้โดยส่งพารามิเตอร์ type เพิ่มเติมไปยังตัวสร้าง ReadableStream()

new ReadableStream({ type: 'bytes' });

underlyingSource

แหล่งที่มาพื้นฐานของสตรีมไบต์ที่อ่านได้จะได้รับ ReadableByteStreamController เพื่อ จัดการ เมธอด ReadableByteStreamController.enqueue() ของคลาสนี้รับอาร์กิวเมนต์ chunk ซึ่งมีค่าเป็น ArrayBufferView พร็อพเพอร์ตี้ ReadableByteStreamController.byobRequest จะแสดงผลคำขอพุล BYOB ปัจจุบัน หรือค่า Null หากไม่มี สุดท้าย ReadableByteStreamController.desiredSize พร็อพเพอร์ตี้จะส่งคืนขนาดที่ต้องการเพื่อเติมคิวภายในของสตรีมที่ควบคุม

queuingStrategy

อาร์กิวเมนต์ที่ 2 ของตัวสร้าง ReadableStream() ซึ่งจะใส่หรือไม่ใส่ก็ได้คือ queuingStrategy เป็นออบเจ็กต์ที่กําหนดกลยุทธ์การจัดคิวสําหรับสตรีมได้โดยไม่บังคับ ซึ่งมีพารามิเตอร์ 1 รายการ ดังนี้

  • highWaterMark: จำนวนไบต์ที่ไม่เป็นลบซึ่งระบุเครื่องหมายระดับสูงสุดของสตรีมโดยใช้กลยุทธ์การจัดคิวนี้ ระบบจะใช้ข้อมูลนี้เพื่อกำหนดแรงดันย้อนกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ ReadableByteStreamController.desiredSize ที่เหมาะสม นอกจากนี้ยังควบคุมเวลาที่เรียกใช้เมธอด pull() ของแหล่งที่มาพื้นฐานด้วย

วิธีการ getReader() และ read()

จากนั้นคุณจะเข้าถึง ReadableStreamBYOBReader ได้โดยตั้งค่าพารามิเตอร์ mode ตามนั้น ReadableStream.getReader({ mode: "byob" }) ซึ่งช่วยให้ควบคุมการจัดสรรบัฟเฟอร์ได้แม่นยำยิ่งขึ้นเพื่อหลีกเลี่ยงการคัดลอก หากต้องการอ่านจากสตรีมไบต์ คุณต้องเรียกใช้ ReadableStreamBYOBReader.read(view) โดยที่ view คือ ArrayBufferView

ตัวอย่างโค้ด Readable Byte Stream

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

ฟังก์ชันต่อไปนี้จะแสดงผลสตรีมไบต์ที่อ่านได้ ซึ่งช่วยให้อ่านอาร์เรย์ที่สร้างขึ้นแบบสุ่มได้อย่างมีประสิทธิภาพโดยไม่ต้องคัดลอก แทนที่จะใช้ขนาดก้อนข้อมูลที่กำหนดไว้ล่วงหน้าเป็น 1,024 ระบบจะพยายามเติมบัฟเฟอร์ที่นักพัฒนาแอปจัดหาให้ ซึ่งช่วยให้ควบคุมได้อย่างเต็มที่

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

กลไกของสตรีมที่เขียนได้

สตรีมที่เขียนได้คือปลายทางที่คุณเขียนข้อมูลได้ ซึ่งแสดงใน JavaScript ด้วยออบเจ็กต์ WritableStream ซึ่งทำหน้าที่เป็นแอบสแตรกชันเหนือซิงก์พื้นฐาน ซึ่งเป็นซิงก์ I/O ระดับล่างที่ใช้เขียนข้อมูลดิบ

ระบบจะเขียนข้อมูลลงในสตรีมผ่านโปรแกรมเขียนทีละก้อน โดยก้อนข้อมูลจะมีหลายรูปแบบเช่นเดียวกับก้อนข้อมูลในโปรแกรมอ่าน คุณใช้โค้ดใดก็ได้ตามต้องการเพื่อสร้าง ก้อนข้อมูลที่พร้อมสำหรับการเขียน โดยผู้เขียนและโค้ดที่เกี่ยวข้องจะเรียกว่าโปรดิวเซอร์

เมื่อสร้าง Writer และเริ่มเขียนไปยังสตรีม (Writer ที่ใช้งานอยู่) จะถือว่า Writer ล็อกกับสตรีมนั้น โดยมีผู้เขียนได้เพียงคนเดียวเท่านั้นที่เขียนไปยังสตรีมที่เขียนได้ในครั้งเดียว หากต้องการให้ผู้เขียนคนอื่นเริ่มเขียนไปยังสตรีมของคุณ โดยปกติแล้วคุณจะต้องปล่อยสตรีมก่อน จากนั้นจึงแนบผู้เขียนอีกคนไปยังสตรีม

คิวภายในจะติดตามก้อนข้อมูลที่เขียนลงในสตรีมแล้ว แต่ยังไม่ได้ ประมวลผลโดย Sink พื้นฐาน

กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กำหนดวิธีที่สตรีมควรส่งสัญญาณแรงดันย้อนกลับตาม สถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละก้อน และเปรียบเทียบ ขนาดทั้งหมดของก้อนทั้งหมดในคิวกับตัวเลขที่ระบุ ซึ่งเรียกว่าเครื่องหมายระดับสูง

โครงสร้างสุดท้ายเรียกว่าคอนโทรลเลอร์ สตรีมที่เขียนได้แต่ละรายการมีตัวควบคุมที่เชื่อมโยงซึ่ง ช่วยให้คุณควบคุมสตรีมได้ (เช่น ยกเลิก)

การสร้างสตรีมที่เขียนได้

อินเทอร์เฟซ WritableStream ของ Streams API มีการแยกข้อมูลมาตรฐานสำหรับการเขียนข้อมูลการสตรีมไปยังปลายทางที่เรียกว่า Sink ออบเจ็กต์นี้มาพร้อมกับแรงดันย้อนกลับและการจัดคิวในตัว คุณสร้างสตรีมที่เขียนได้โดย เรียกใช้ตัวสร้าง WritableStream() โดยมีพารามิเตอร์ underlyingSink ซึ่งไม่บังคับ ซึ่งแสดงออบเจ็กต์ ที่มีเมธอดและพร็อพเพอร์ตี้ที่กำหนดลักษณะการทำงานของอินสแตนซ์สตรีมที่สร้างขึ้น

underlyingSink

underlyingSink อาจมีเมธอดที่ไม่บังคับซึ่งนักพัฒนาแอปกำหนดไว้ดังต่อไปนี้ controller พารามิเตอร์ที่ส่งไปยังเมธอดบางรายการคือ WritableStreamDefaultController

  • start(controller): ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อสร้างออบเจ็กต์ เนื้อหาของเมธอดนี้ควรมีเป้าหมายเพื่อรับสิทธิ์เข้าถึง Sink พื้นฐาน หากต้องการดำเนินการนี้แบบไม่พร้อมกัน ก็สามารถส่งคืน Promise เพื่อส่งสัญญาณความสำเร็จหรือความล้มเหลวได้
  • write(chunk, controller): ระบบจะเรียกใช้เมธอดนี้เมื่อพร้อมที่จะเขียนข้อมูลก้อนใหม่ (ระบุไว้ในพารามิเตอร์ chunk) ลงใน Sink พื้นฐาน โดยจะคืนค่า Promise เพื่อ ส่งสัญญาณว่าการดำเนินการเขียนสำเร็จหรือไม่ ระบบจะเรียกใช้เมธอดนี้หลังจากที่การเขียนก่อนหน้าสำเร็จแล้วเท่านั้น และจะไม่เรียกใช้หลังจากที่สตรีมปิดหรือถูกยกเลิก
  • close(controller): ระบบจะเรียกใช้เมธอดนี้หากแอปส่งสัญญาณว่าเขียน ก้อนข้อมูลลงในสตรีมเสร็จแล้ว เนื้อหาควรทำทุกอย่างที่จำเป็นเพื่อเขียนข้อมูลไปยัง ปลายทางที่อยู่เบื้องหลังให้เสร็จสมบูรณ์ และปล่อยสิทธิ์เข้าถึง หากกระบวนการนี้เป็นแบบไม่พร้อมกัน ก็จะส่งคืน Promise เพื่อส่งสัญญาณความสำเร็จหรือความล้มเหลวได้ ระบบจะเรียกใช้วิธีนี้หลังจากที่การเขียนที่อยู่ในคิวทั้งหมด สำเร็จแล้วเท่านั้น
  • abort(reason): ระบบจะเรียกใช้เมธอดนี้หากแอปส่งสัญญาณว่าต้องการปิดสตรีมอย่างกะทันหัน และเปลี่ยนเป็นสถานะข้อผิดพลาด ซึ่งจะล้างทรัพยากรที่ระงับไว้ได้เหมือนกับ close() แต่ระบบจะเรียกใช้ abort() แม้ว่าจะมีการจัดคิวการเขียนก็ตาม ระบบจะทิ้ง ส่วนเหล่านั้น หากกระบวนการนี้เป็นแบบไม่พร้อมกัน ก็จะแสดงผล Promise เพื่อส่งสัญญาณความสำเร็จหรือความล้มเหลวได้ พารามิเตอร์ reasonมีDOMStringที่อธิบายสาเหตุที่สตรีมถูกยกเลิก
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

อินเทอร์เฟซของ Streams API ซึ่ง WritableStreamDefaultController แสดงถึงตัวควบคุมที่อนุญาตให้ควบคุมสถานะของ WritableStream ในระหว่างการตั้งค่า เมื่อส่งก้อนข้อมูลเพิ่มเติมเพื่อเขียน หรือเมื่อเขียนเสร็จแล้ว เมื่อสร้าง WritableStream ระบบจะให้อินสแตนซ์ WritableStreamDefaultController ที่เกี่ยวข้องแก่ Sink พื้นฐานเพื่อใช้จัดการ WritableStreamDefaultController มีเพียงวิธีเดียวคือ WritableStreamDefaultController.error() ซึ่งจะทำให้การโต้ตอบในอนาคตกับสตรีมที่เชื่อมโยงเกิดข้อผิดพลาด WritableStreamDefaultController ยังรองรับพร็อพเพอร์ตี้ signal ซึ่งจะแสดงผลอินสแตนซ์ของ AbortSignal ทำให้หยุดการดำเนินการ WritableStream ได้หากจำเป็น

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

อาร์กิวเมนต์ที่ 2 ของตัวสร้าง WritableStream() ซึ่งจะใส่หรือไม่ใส่ก็ได้คือ queuingStrategy เป็นออบเจ็กต์ที่กําหนดกลยุทธ์การจัดคิวสําหรับสตรีมได้ (ไม่บังคับ) ซึ่งมีพารามิเตอร์ 2 รายการดังนี้

  • highWaterMark: ตัวเลขที่ไม่เป็นลบซึ่งระบุเครื่องหมายสูงสุดของสตรีมโดยใช้กลยุทธ์การจัดคิวนี้
  • size(chunk): ฟังก์ชันที่คำนวณและแสดงผลขนาดที่ไม่เป็นลบแบบจำกัดของค่าก้อนที่ระบุ ระบบจะใช้ผลลัพธ์เพื่อกำหนดแรงดันย้อนกลับ ซึ่งจะแสดงผ่านพร็อพเพอร์ตี้ WritableStreamDefaultWriter.desiredSize ที่เหมาะสม

วิธีการ getWriter() และ write()

หากต้องการเขียนไปยังสตรีมที่เขียนได้ คุณต้องมี Writer ซึ่งจะเป็น WritableStreamDefaultWriter เมธอด getWriter() ของอินเทอร์เฟซ WritableStream จะแสดงอินสแตนซ์ใหม่ของ WritableStreamDefaultWriter และล็อกสตรีมไปยังอินสแตนซ์นั้น ขณะที่ล็อกสตรีมอยู่ คุณจะจ้างนักเขียนคนอื่นไม่ได้จนกว่าจะปล่อยตัวนักเขียนคนปัจจุบัน

เมธอด write() ของอินเทอร์เฟซ WritableStreamDefaultWriter จะเขียนกลุ่มข้อมูลที่ส่งไปยัง WritableStream และ Sink ที่เกี่ยวข้อง จากนั้นจะแสดงผล Promise ที่แก้ไขเพื่อระบุความสำเร็จหรือความล้มเหลวของการดำเนินการเขียน โปรดทราบว่า "สำเร็จ" หมายถึงอะไรนั้นขึ้นอยู่กับ Sink ที่เกี่ยวข้อง ซึ่งอาจบ่งบอกว่ายอมรับก้อนข้อมูลแล้ว และไม่จำเป็นต้องหมายความว่าระบบได้บันทึกก้อนข้อมูลอย่างปลอดภัยไปยังปลายทางสุดท้าย

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

พร็อพเพอร์ตี้ locked

คุณสามารถตรวจสอบว่าสตรีมที่เขียนได้ถูกล็อกหรือไม่โดยเข้าถึงพร็อพเพอร์ตี้ WritableStream.locked ของสตรีม

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

ตัวอย่างโค้ดสตรีมที่เขียนได้

โค้ดตัวอย่างด้านล่างแสดงขั้นตอนทั้งหมดในการดำเนินการ

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

การส่งสตรีมที่อ่านได้ไปยังสตรีมที่เขียนได้

คุณส่งสตรีมที่อ่านได้ไปยังสตรีมที่เขียนได้ผ่านเมธอด pipeTo() ของสตรีมที่อ่านได้ ReadableStream.pipeTo() จะส่ง ReadableStream ปัจจุบันไปยัง WritableStream ที่ระบุ และแสดงผลสัญญาที่ดำเนินการเมื่อกระบวนการส่งต่อเสร็จสมบูรณ์ หรือปฏิเสธหากพบข้อผิดพลาด

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

การสร้างสตรีมการเปลี่ยนรูปแบบ

TransformStream อินเทอร์เฟซของ Streams API แสดงชุดข้อมูลที่แปลงได้ คุณสร้างสตรีมการแปลงโดยการเรียกตัวสร้าง TransformStream() ซึ่งจะสร้างและส่งคืน ออบเจ็กต์สตรีมการแปลงจากแฮนเดิลที่ระบุ ตัวสร้าง TransformStream() รับออบเจ็กต์ JavaScript ที่ไม่บังคับเป็นอาร์กิวเมนต์แรก ซึ่งแสดงถึง transformer ออบเจ็กต์ดังกล่าวอาจมีเมธอดต่อไปนี้

transformer

  • start(controller): ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อสร้างออบเจ็กต์ โดยปกติแล้ว จะใช้เพื่อจัดคิวกลุ่มคำนำหน้าโดยใช้ controller.enqueue() ระบบจะอ่านข้อมูลจากฝั่งที่อ่านได้ แต่จะไม่ขึ้นอยู่กับการเขียนใดๆ ไปยังฝั่งที่เขียนได้ หากกระบวนการเริ่มต้นนี้เป็นแบบอะซิงโครนัส เช่น เนื่องจากต้องใช้ความพยายามในการรับกลุ่มคำนำหน้า ฟังก์ชันจะคืนค่า Promise เพื่อส่งสัญญาณว่าสำเร็จหรือล้มเหลว Promise ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดในสตรีม ตัวสร้าง TransformStream() จะส่งข้อยกเว้นที่เกิดขึ้นอีกครั้ง
  • transform(chunk, controller): เมธอดนี้จะเรียกใช้เมื่อ Chunk ใหม่ที่เขียนลงในด้านที่เขียนได้พร้อมที่จะแปลง การติดตั้งใช้งานสตรีมรับประกันว่าฟังก์ชันนี้ จะเรียกใช้หลังจากที่การแปลงก่อนหน้าสำเร็จแล้วเท่านั้น และจะไม่มีการเรียกใช้ก่อนที่ start() จะ เสร็จสมบูรณ์หรือหลังจากเรียกใช้ flush() ฟังก์ชันนี้จะทําการแปลงจริง ของสตรีมการแปลง โดยจะจัดคิวผลลัพธ์ได้โดยใช้ controller.enqueue() ซึ่งจะทำให้ก้อนข้อมูลเดียวที่เขียนไปยังฝั่งที่เขียนได้ส่งผลให้มีก้อนข้อมูลเป็น 0 หรือหลายก้อนในฝั่งที่อ่านได้ ทั้งนี้ขึ้นอยู่กับจำนวนครั้งที่เรียกใช้ controller.enqueue() หากกระบวนการ แปลงเป็นแบบไม่พร้อมกัน ฟังก์ชันนี้จะแสดงผล Promise เพื่อส่งสัญญาณว่าการ แปลงสำเร็จหรือไม่ Promise ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดทั้งด้านที่อ่านได้และเขียนได้ของ สตรีมการแปลง หากไม่ได้ระบุtransform() method ระบบจะใช้การแปลงข้อมูลระบุตัวตน ซึ่งจะ จัดคิวกลุ่มข้อมูลจากฝั่งที่เขียนได้ไปยังฝั่งที่อ่านได้โดยไม่มีการเปลี่ยนแปลง
  • flush(controller): เมธอดนี้จะเรียกใช้หลังจากที่เขียนก้อนข้อมูลทั้งหมดไปยังฝั่งที่เขียนได้แล้ว โดยการส่งผ่าน transform() เรียบร้อยแล้ว และฝั่งที่เขียนได้กำลังจะปิด โดยปกติแล้วจะใช้เพื่อจัดคิวกลุ่มคำต่อท้ายไปยังฝั่งที่อ่านได้ก่อนที่จะปิดฝั่งนั้นด้วย หากกระบวนการล้างข้อมูลเป็นแบบอะซิงโครนัส ฟังก์ชันจะคืนค่า Promise เพื่อ ส่งสัญญาณว่าสำเร็จหรือล้มเหลว และจะแจ้งผลลัพธ์ให้ผู้เรียกใช้ stream.writable.write() ทราบ นอกจากนี้ Promise ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดทั้งในด้านที่อ่านได้และด้านที่เขียนได้ของสตรีม การส่งข้อยกเว้นจะถือว่าเหมือนกับการส่งคืน Promise ที่ถูกปฏิเสธ
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

กลยุทธ์การจัดคิว writableStrategy และ readableStrategy

พารามิเตอร์ที่ 2 และ 3 ที่ไม่บังคับของตัวสร้าง TransformStream() คือ writableStrategy และreadableStrategy กลยุทธ์การจัดคิว โดยจะกำหนดตามที่ระบุไว้ในส่วนสตรีมที่อ่านได้และเขียนได้ตามลำดับ

ตัวอย่างโค้ดสตรีมการแปลง

ตัวอย่างโค้ดต่อไปนี้แสดงสตรีมการแปลงอย่างง่ายที่ทำงาน

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

การส่งสตรีมที่อ่านได้ผ่านสตรีมการแปลง

เมธอด pipeThrough() ของอินเทอร์เฟซ ReadableStream มีวิธีที่เชื่อมต่อกันได้ในการส่งสตรีมปัจจุบัน ผ่านสตรีมการเปลี่ยนรูปแบบหรือคู่ที่เขียน/อ่านได้อื่นๆ โดยทั่วไป การส่งสตรีมผ่านไปป์จะล็อก สตรีมนั้นตลอดระยะเวลาของไปป์ ซึ่งจะป้องกันไม่ให้ผู้อ่านคนอื่นๆ ล็อกสตรีม

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

ตัวอย่างโค้ดถัดไป (ซึ่งอาจดูซับซ้อนเล็กน้อย) แสดงวิธีใช้ fetch() เวอร์ชัน "ตะโกน" ซึ่งจะเปลี่ยนข้อความทั้งหมดเป็นตัวพิมพ์ใหญ่โดยใช้สัญญาการตอบกลับที่ส่งคืน เป็นสตรีม และเปลี่ยนเป็นตัวพิมพ์ใหญ่ทีละกลุ่ม ข้อดีของแนวทางนี้คือคุณไม่จำเป็นต้องรอให้ระบบดาวน์โหลดเอกสารทั้งหมด ซึ่งอาจสร้างความแตกต่างอย่างมากเมื่อต้องจัดการกับไฟล์ขนาดใหญ่

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

สาธิต

การสาธิตด้านล่างแสดงสตรีมที่อ่านได้ เขียนได้ และแปลงได้ในการทำงาน นอกจากนี้ ยังมีตัวอย่าง ของเชนไปป์ pipeThrough() และ pipeTo() รวมถึงแสดงให้เห็นถึง tee() คุณเลือกเรียกใช้เดโมในหน้าต่างของตัวเองหรือดูซอร์สโค้ดได้

สตรีมที่มีประโยชน์ซึ่งพร้อมใช้งานในเบราว์เซอร์

เบราว์เซอร์มีสตรีมที่มีประโยชน์หลายอย่างในตัว คุณสร้าง ReadableStreamจาก Blob ได้อย่างง่ายดาย เมธอด stream() ของอินเทอร์เฟซ Blob จะแสดงผล ReadableStream ซึ่งเมื่ออ่านแล้วจะแสดงผลข้อมูลที่อยู่ใน Blob นอกจากนี้ โปรดทราบว่าออบเจ็กต์ File คือออบเจ็กต์ Blob ประเภทหนึ่ง และสามารถใช้ในบริบทใดก็ได้ที่ใช้ Blob ได้

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

รูปแบบการสตรีมของ TextDecoder.decode() และ TextEncoder.encode() เรียกว่า TextDecoderStream และ TextEncoderStream ตามลำดับ

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

การบีบอัดหรือคลายการบีบอัดไฟล์ทำได้ง่ายๆ ด้วยสตรีมการแปลงCompressionStreamและDecompressionStream ตามลำดับ ตัวอย่างโค้ดด้านล่างแสดงวิธีดาวน์โหลดสเปคของสตรีม บีบอัด (gzip) ในเบราว์เซอร์ และเขียนไฟล์ที่บีบอัดลงในดิสก์โดยตรง

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API FileSystemWritableFileStream และfetch()สตรีมคำขอเวอร์ชันทดลองเป็น ตัวอย่างของสตรีมที่เขียนได้ในสภาพแวดล้อมจริง

Serial API ใช้สตรีมที่อ่านได้และเขียนได้ทั้ง 2 อย่างอย่างมาก

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

สุดท้าย API WebSocketStream จะผสานรวมสตรีมกับ WebSocket API

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

แหล่งข้อมูลที่มีประโยชน์

คำขอบคุณ

บทความนี้ได้รับการตรวจสอบโดย Jake Archibald François Beaufort Sam Dutton Mattias Buelens Surma Joe Medley และ Adam Rice บล็อกโพสต์ของ Jake Archibald ช่วยให้ฉันเข้าใจสตรีมได้มาก ตัวอย่างโค้ดบางส่วนได้รับแรงบันดาลใจจากการสำรวจของผู้ใช้ GitHub @bellbind และส่วนของข้อความอิงตามเอกสารประกอบบนเว็บ MDN เกี่ยวกับสตรีม มาตรฐาน Streams ผู้เขียนได้สร้างผลงานที่ยอดเยี่ยม ในการเขียนข้อกำหนดนี้ รูปภาพฮีโร่โดย Ryan Lara ใน Unsplash