Luồng — Hướng dẫn chính thức

Tìm hiểu cách sử dụng các luồng có thể đọc, ghi và chuyển đổi bằng Streams API.

Streams API cho phép bạn truy cập theo phương thức lập trình vào các luồng dữ liệu nhận được qua mạng hoặc được tạo bằng bất kỳ phương tiện nào trên máy và xử lý các luồng đó bằng JavaScript. Truyền trực tuyến là quá trình chia nhỏ tài nguyên mà bạn muốn nhận, gửi hoặc chuyển đổi thành các phần nhỏ, sau đó xử lý từng phần một. Mặc dù phát trực tuyến là một tính năng mà trình duyệt thực hiện khi nhận các thành phần như HTML hoặc video để hiển thị trên trang web, nhưng JavaScript chưa bao giờ có tính năng này trước khi fetch với luồng được ra mắt vào năm 2015.

Trước đây, nếu muốn xử lý một loại tài nguyên (cho dù đó là video hay tệp văn bản, v.v.), bạn sẽ phải tải toàn bộ tệp xuống, đợi tệp được chuyển đổi tuần tự sang định dạng phù hợp, sau đó xử lý tệp đó. Khi luồng có sẵn cho JavaScript, tất cả điều này sẽ thay đổi. Giờ đây, bạn có thể xử lý dữ liệu thô bằng JavaScript một cách liên tục ngay khi dữ liệu đó có trên ứng dụng khách, mà không cần tạo vùng đệm, chuỗi hoặc blob. Điều này mở ra một số trường hợp sử dụng, một số trường hợp tôi liệt kê dưới đây:

  • Hiệu ứng video: chuyển luồng video có thể đọc được thông qua luồng chuyển đổi áp dụng hiệu ứng theo thời gian thực.
  • (Huỷ) nén dữ liệu: chuyển một luồng tệp thông qua một luồng chuyển đổi (huỷ) nén luồng đó một cách có chọn lọc.
  • Giải mã hình ảnh: chuyển luồng phản hồi HTTP thông qua luồng chuyển đổi giải mã các byte thành dữ liệu bitmap, sau đó thông qua một luồng chuyển đổi khác để dịch bitmap thành PNG. Nếu được cài đặt bên trong trình xử lý fetch của trình chạy dịch vụ, thì bạn có thể polyfill minh bạch các định dạng hình ảnh mới như AVIF.

Hỗ trợ trình duyệt

ReadableStream và WritableStream

Hỗ trợ trình duyệt

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Nguồn

TransformStream

Hỗ trợ trình duyệt

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Nguồn

Các khái niệm cốt lõi

Trước khi đi vào chi tiết về các loại luồng, tôi xin giới thiệu một số khái niệm cốt lõi.

Mảnh

Phân đoạn là một phần dữ liệu được ghi vào hoặc đọc từ một luồng. Loại dữ liệu này có thể là bất kỳ; luồng thậm chí có thể chứa các phần của nhiều loại dữ liệu. Trong hầu hết trường hợp, một đoạn sẽ không phải là đơn vị dữ liệu nguyên tử nhất cho một luồng nhất định. Ví dụ: luồng byte có thể chứa các đoạn bao gồm 16 đơn vị Uint8Array KiB, thay vì các byte đơn lẻ.

Luồng có thể đọc

Luồng có thể đọc là một nguồn dữ liệu mà bạn có thể đọc. Nói cách khác, dữ liệu được tạo ra từ một luồng có thể đọc được. Cụ thể, một luồng có thể đọc được là một thực thể của lớp ReadableStream.

Luồng có thể ghi

Luồng có thể ghi là một đích đến cho dữ liệu mà bạn có thể ghi vào. Nói cách khác, dữ liệu vào một luồng có thể ghi. Cụ thể, một luồng có thể ghi là một thực thể của lớp WritableStream.

Biến đổi luồng

Luồng biến đổi bao gồm một cặp luồng: một luồng có thể ghi, còn gọi là bên có thể ghi và một luồng có thể đọc, còn gọi là bên có thể đọc. Một phép ẩn dụ thực tế cho điều này là một người phiên dịch đồng thời, người dịch từ ngôn ngữ này sang ngôn ngữ khác một cách nhanh chóng. Theo cách dành riêng cho luồng chuyển đổi, việc ghi vào phía có thể ghi sẽ dẫn đến việc dữ liệu mới được cung cấp để đọc từ phía có thể đọc. Cụ thể, mọi đối tượng có thuộc tính writable và thuộc tính readable đều có thể đóng vai trò là luồng biến đổi. Tuy nhiên, lớp TransformStream chuẩn giúp bạn dễ dàng tạo một cặp như vậy được xoắn đúng cách.

Chuỗi ống

Luồng chủ yếu được sử dụng bằng cách chuyển các luồng đó cho nhau. Bạn có thể chuyển trực tiếp luồng có thể đọc sang luồng có thể ghi bằng phương thức pipeTo() của luồng có thể đọc, hoặc chuyển luồng có thể đọc qua một hoặc nhiều luồng biến đổi trước, bằng phương thức pipeThrough() của luồng có thể đọc. Một tập hợp các luồng được chuyển cùng nhau theo cách này được gọi là chuỗi ống.

Áp lực ngược

Sau khi tạo một chuỗi ống, chuỗi này sẽ truyền các tín hiệu về tốc độ truyền các đoạn thông qua chuỗi đó. Nếu bất kỳ bước nào trong chuỗi chưa thể chấp nhận các đoạn, thì bước đó sẽ truyền tín hiệu ngược lại thông qua chuỗi ống, cho đến khi nguồn ban đầu được yêu cầu ngừng tạo các đoạn quá nhanh. Quá trình chuẩn hoá luồng này được gọi là áp lực ngược.

Teeing

Bạn có thể tạo một luồng có thể đọc được (được đặt tên theo hình dạng của chữ "T" viết hoa) bằng phương thức tee(). Thao tác này sẽ khoá luồng, tức là không thể sử dụng trực tiếp luồng đó nữa; tuy nhiên, thao tác này sẽ tạo 2 luồng mới, được gọi là nhánh, có thể được sử dụng độc lập. Việc tạo điểm xuất phát cũng rất quan trọng vì bạn không thể tua lại hoặc bắt đầu lại luồng phát. Chúng ta sẽ tìm hiểu thêm về vấn đề này sau.

Sơ đồ của một chuỗi ống bao gồm một luồng có thể đọc được từ lệnh gọi đến API tìm nạp, sau đó được chuyển qua một luồng biến đổi có đầu ra được phân nhánh rồi gửi đến trình duyệt cho luồng có thể đọc được đầu tiên và đến bộ nhớ đệm của worker dịch vụ cho luồng có thể đọc được thứ hai.
Chuỗi ống.

Cơ chế của luồng có thể đọc

Luồng có thể đọc là một nguồn dữ liệu được biểu thị trong JavaScript bằng đối tượng ReadableStream chảy từ một nguồn cơ bản. Hàm khởi tạo ReadableStream() tạo và trả về một đối tượng luồng có thể đọc được từ các trình xử lý đã cho. Có hai loại nguồn cơ bản:

  • Nguồn đẩy liên tục đẩy dữ liệu đến bạn khi bạn truy cập vào nguồn đó. Bạn có thể bắt đầu, tạm dừng hoặc huỷ quyền truy cập vào luồng dữ liệu. Ví dụ: luồng video trực tiếp, sự kiện do máy chủ gửi hoặc WebSocket.
  • Nguồn dữ liệu lấy yêu cầu bạn phải yêu cầu dữ liệu một cách rõ ràng từ các nguồn đó sau khi kết nối. Ví dụ: các thao tác HTTP thông qua lệnh gọi fetch() hoặc XMLHttpRequest.

Dữ liệu luồng được đọc tuần tự theo các phần nhỏ được gọi là mảng. Các đoạn được đặt trong luồng được gọi là đã thêm vào hàng đợi. Điều này có nghĩa là các thư này đang chờ trong hàng đợi sẵn sàng để đọc. Hàng đợi nội bộ theo dõi các đoạn chưa được đọc.

Chiến lược xếp hàng là một đối tượng xác định cách luồng sẽ báo hiệu áp lực ngược dựa trên trạng thái của hàng đợi nội bộ. Chiến lược xếp hàng gán kích thước cho mỗi phần và so sánh tổng kích thước của tất cả các phần trong hàng đợi với một số đã chỉ định, được gọi là mốc nước cao.

Các đoạn bên trong luồng được trình đọc đọc. Trình đọc này truy xuất dữ liệu từng phần một, cho phép bạn thực hiện bất kỳ thao tác nào bạn muốn trên dữ liệu đó. Trình đọc cùng với mã xử lý khác đi kèm được gọi là người dùng.

Cấu trúc tiếp theo trong ngữ cảnh này được gọi là trình điều khiển. Mỗi luồng có thể đọc được đều có một trình điều khiển liên kết. Như tên gọi, trình điều khiển này cho phép bạn kiểm soát luồng.

Mỗi lần chỉ có một trình đọc có thể đọc luồng; khi một trình đọc được tạo và bắt đầu đọc luồng (nghĩa là trở thành trình đọc đang hoạt động), trình đọc đó sẽ bị khoá với luồng đó. Nếu muốn một trình đọc khác tiếp quản việc đọc luồng, bạn thường cần giải phóng trình đọc đầu tiên trước khi làm bất cứ việc gì khác (mặc dù bạn có thể chia luồng).

Tạo luồng có thể đọc

Bạn tạo một luồng có thể đọc được bằng cách gọi hàm khởi tạo ReadableStream(). Hàm khởi tạo có một đối số không bắt buộc underlyingSource, đại diện cho một đối tượng có các phương thức và thuộc tính xác định cách thực thể luồng được tạo sẽ hoạt động.

underlyingSource

Phương thức này có thể sử dụng các phương thức không bắt buộc do nhà phát triển xác định sau đây:

  • start(controller): Được gọi ngay khi đối tượng được tạo. Phương thức này có thể truy cập vào nguồn luồng và thực hiện mọi thao tác cần thiết khác để thiết lập chức năng luồng. Nếu quá trình này được thực hiện không đồng bộ, phương thức có thể trả về một lời hứa để báo hiệu thành công hoặc không thành công. Tham số controller được truyền đến phương thức này là một ReadableStreamDefaultController.
  • pull(controller): Có thể dùng để kiểm soát luồng khi tìm nạp thêm các đoạn. Phương thức này được gọi lặp lại miễn là hàng đợi nội bộ của các đoạn trong luồng không đầy, cho đến khi hàng đợi đạt đến điểm cao. Nếu kết quả của lệnh gọi pull() là một lời hứa, thì pull() sẽ không được gọi lại cho đến khi lời hứa đó được thực hiện. Nếu lời hứa từ chối, luồng sẽ gặp lỗi.
  • cancel(reason): Được gọi khi trình tiêu thụ luồng huỷ luồng.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController hỗ trợ các phương thức sau:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Đối số thứ hai (cũng không bắt buộc) của hàm khởi tạo ReadableStream()queuingStrategy. Đây là một đối tượng xác định chiến lược xếp hàng cho luồng (không bắt buộc), trong đó có hai tham số:

  • highWaterMark: Một số không âm cho biết điểm cao của luồng bằng cách sử dụng chiến lược xếp hàng này.
  • size(chunk): Hàm tính toán và trả về kích thước hữu hạn không âm của giá trị khối đã cho. Kết quả được dùng để xác định áp lực ngược, thể hiện thông qua thuộc tính ReadableStreamDefaultController.desiredSize thích hợp. Phương thức này cũng điều chỉnh thời điểm gọi phương thức pull() của nguồn cơ bản.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Phương thức getReader()read()

Để đọc từ một luồng có thể đọc được, bạn cần có một trình đọc, đó sẽ là một ReadableStreamDefaultReader. Phương thức getReader() của giao diện ReadableStream tạo một trình đọc và khoá luồng vào trình đọc đó. Khi luồng bị khoá, bạn không thể lấy trình đọc nào khác cho đến khi trình đọc này được phát hành.

Phương thức read() của giao diện ReadableStreamDefaultReader trả về một lời hứa cung cấp quyền truy cập vào phần tiếp theo trong hàng đợi nội bộ của luồng. Phương thức này thực hiện hoặc từ chối bằng một kết quả tuỳ thuộc vào trạng thái của luồng. Có các khả năng sau:

  • Nếu có một phần, lời hứa sẽ được thực hiện bằng một đối tượng ở dạng
    { value: chunk, done: false }.
  • Nếu luồng bị đóng, lời hứa sẽ được thực hiện bằng một đối tượng có dạng
    { value: undefined, done: true }.
  • Nếu luồng gặp lỗi, lời hứa sẽ bị từ chối kèm theo lỗi liên quan.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Thuộc tính locked

Bạn có thể kiểm tra xem một luồng có thể đọc được có bị khoá hay không bằng cách truy cập vào thuộc tính ReadableStream.locked của luồng đó.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Mã mẫu luồng có thể đọc

Mã mẫu bên dưới cho thấy tất cả các bước trong thực tế. Trước tiên, bạn tạo một ReadableStream trong đối số underlyingSource (tức là lớp TimestampSource) xác định một phương thức start(). Phương thức này yêu cầu controller của luồng enqueue() một dấu thời gian mỗi giây trong 10 giây. Cuối cùng, lớp này sẽ yêu cầu trình điều khiển close() luồng. Bạn sử dụng luồng này bằng cách tạo một trình đọc thông qua phương thức getReader() và gọi read() cho đến khi luồng là done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Lặp lại không đồng bộ

Việc kiểm tra trong mỗi vòng lặp read() nếu luồng là done có thể không phải là API thuận tiện nhất. May mắn thay, sẽ sớm có một cách tốt hơn để thực hiện việc này: lặp lại không đồng bộ.

for await (const chunk of stream) {
  console.log(chunk);
}

Một giải pháp để sử dụng hoạt động lặp lại không đồng bộ hiện nay là triển khai hành vi này bằng polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Tạo luồng có thể đọc được

Phương thức tee() của giao diện ReadableStream sẽ tạo luồng có thể đọc hiện tại, trả về một mảng gồm hai phần tử chứa hai nhánh kết quả dưới dạng các thực thể ReadableStream mới. Điều này cho phép hai trình đọc đọc đồng thời một luồng. Bạn có thể thực hiện việc này, chẳng hạn như trong trình chạy dịch vụ nếu muốn tìm nạp phản hồi từ máy chủ và truyền phản hồi đó đến trình duyệt, đồng thời truyền phản hồi đó đến bộ nhớ đệm của trình chạy dịch vụ. Vì không thể sử dụng một nội dung phản hồi nhiều lần, nên bạn cần có hai bản sao để thực hiện việc này. Để huỷ luồng, bạn cần huỷ cả hai nhánh kết quả. Việc tạo luồng đầu vào đầu ra (tee) thường sẽ khoá luồng đó trong khoảng thời gian đó, ngăn các trình đọc khác khoá luồng đó.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Luồng byte có thể đọc

Đối với các luồng đại diện cho byte, một phiên bản mở rộng của luồng có thể đọc được cung cấp để xử lý các byte một cách hiệu quả, cụ thể là bằng cách giảm thiểu số bản sao. Luồng byte cho phép thu nạp trình đọc mang theo vùng đệm của riêng bạn (BYOB). Phương thức triển khai mặc định có thể cung cấp nhiều đầu ra khác nhau, chẳng hạn như chuỗi hoặc vùng đệm mảng trong trường hợp WebSocket, trong khi luồng byte đảm bảo đầu ra byte. Ngoài ra, trình đọc BYOB còn có các lợi ích về độ ổn định. Lý do là nếu một vùng đệm tách ra, thì vùng đệm đó có thể đảm bảo rằng không ghi vào cùng một vùng đệm hai lần, do đó tránh được tình trạng tương tranh. Trình đọc BYOB có thể giảm số lần trình duyệt cần chạy quy trình thu thập rác vì trình đọc này có thể sử dụng lại vùng đệm.

Tạo luồng byte có thể đọc

Bạn có thể tạo một luồng byte có thể đọc được bằng cách truyền một tham số type bổ sung vào hàm khởi tạo ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Nguồn cơ bản của luồng byte có thể đọc được được cung cấp ReadableByteStreamController để thao tác. Phương thức ReadableByteStreamController.enqueue() của lớp này nhận một đối số chunk có giá trị là ArrayBufferView. Thuộc tính ReadableByteStreamController.byobRequest trả về yêu cầu kéo BYOB hiện tại hoặc giá trị rỗng nếu không có yêu cầu nào. Cuối cùng, thuộc tính ReadableByteStreamController.desiredSize sẽ trả về kích thước mong muốn để lấp đầy hàng đợi nội bộ của luồng được kiểm soát.

queuingStrategy

Đối số thứ hai (cũng không bắt buộc) của hàm khởi tạo ReadableStream()queuingStrategy. Đây là một đối tượng xác định chiến lược xếp hàng cho luồng (không bắt buộc), lấy một tham số:

  • highWaterMark: Số byte không âm cho biết điểm cao của luồng khi sử dụng chiến lược xếp hàng này. Thông tin này được dùng để xác định áp lực ngược, thể hiện thông qua thuộc tính ReadableByteStreamController.desiredSize thích hợp. Phương thức này cũng điều chỉnh thời điểm gọi phương thức pull() của nguồn cơ bản.

Phương thức getReader()read()

Sau đó, bạn có thể truy cập vào ReadableStreamBYOBReader bằng cách đặt tham số mode cho phù hợp: ReadableStream.getReader({ mode: "byob" }). Điều này cho phép kiểm soát chính xác hơn việc phân bổ vùng đệm để tránh sao chép. Để đọc từ luồng byte, bạn cần gọi ReadableStreamBYOBReader.read(view), trong đó viewArrayBufferView.

Đoạn mã mẫu về luồng byte có thể đọc

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Hàm sau đây trả về các luồng byte có thể đọc, cho phép đọc hiệu quả một mảng được tạo ngẫu nhiên mà không cần sao chép. Thay vì sử dụng kích thước khối được xác định trước là 1.024, phương thức này sẽ cố gắng lấp đầy vùng đệm do nhà phát triển cung cấp, cho phép kiểm soát toàn bộ.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Cơ chế của luồng có thể ghi

Luồng có thể ghi là một đích đến mà bạn có thể ghi dữ liệu, được biểu thị trong JavaScript bằng đối tượng WritableStream. Đây là một thành phần trừu tượng ở đầu chậu lưu trữ cơ bản – một chậu lưu trữ I/O cấp thấp hơn mà dữ liệu thô được ghi vào.

Dữ liệu được ghi vào luồng thông qua một trình ghi, mỗi lần một phần. Một đoạn có thể có nhiều hình thức, giống như các đoạn trong trình đọc. Bạn có thể sử dụng bất kỳ mã nào bạn muốn để tạo các đoạn sẵn sàng để ghi; trình ghi cùng với mã liên kết được gọi là trình tạo.

Khi một trình ghi được tạo và bắt đầu ghi vào một luồng (trình ghi đang hoạt động), thì trình ghi đó được cho là bị khoá với luồng đó. Mỗi lần chỉ có một trình ghi có thể ghi vào luồng có thể ghi. Nếu muốn một trình ghi khác bắt đầu ghi vào luồng, bạn thường cần phát hành luồng đó trước khi đính kèm một trình ghi khác vào luồng đó.

Bộ đợi nội bộ theo dõi các đoạn đã được ghi vào luồng nhưng chưa được bồn lưu trữ cơ bản xử lý.

Chiến lược xếp hàng là một đối tượng xác định cách luồng sẽ báo hiệu áp lực ngược dựa trên trạng thái của hàng đợi nội bộ. Chiến lược xếp hàng gán kích thước cho mỗi phần và so sánh tổng kích thước của tất cả các phần trong hàng đợi với một số đã chỉ định, được gọi là mốc nước cao.

Cấu trúc cuối cùng được gọi là trình điều khiển. Mỗi luồng có thể ghi đều có một trình điều khiển liên kết cho phép bạn kiểm soát luồng đó (ví dụ: để huỷ luồng).

Tạo luồng có thể ghi

Giao diện WritableStream của API Luồng cung cấp một bản tóm tắt chuẩn để ghi dữ liệu truyền trực tuyến đến một đích đến, còn gọi là bồn lưu trữ. Đối tượng này có áp lực ngược và xếp hàng tích hợp. Bạn tạo một luồng có thể ghi bằng cách gọi hàm khởi tạo của luồng đó là WritableStream(). Lớp này có một tham số underlyingSink không bắt buộc, đại diện cho một đối tượng có các phương thức và thuộc tính xác định cách hoạt động của thực thể luồng đã tạo.

underlyingSink

underlyingSink có thể bao gồm các phương thức không bắt buộc do nhà phát triển xác định sau đây. Tham số controller được truyền đến một số phương thức là WritableStreamDefaultController.

  • start(controller): Phương thức này được gọi ngay khi đối tượng được tạo. Nội dung của phương thức này phải nhằm mục đích truy cập vào bồn lưu trữ cơ bản. Nếu quá trình này được thực hiện không đồng bộ, thì quá trình này có thể trả về một lời hứa để báo hiệu thành công hoặc không thành công.
  • write(chunk, controller): Phương thức này sẽ được gọi khi một phần dữ liệu mới (được chỉ định trong tham số chunk) đã sẵn sàng để ghi vào vùng lưu trữ cơ bản. Phương thức này có thể trả về một lời hứa để báo hiệu thao tác ghi thành công hoặc không thành công. Phương thức này sẽ chỉ được gọi sau khi các lần ghi trước đó thành công và không bao giờ được gọi sau khi luồng bị đóng hoặc bị huỷ.
  • close(controller): Phương thức này sẽ được gọi nếu ứng dụng báo hiệu rằng đã hoàn tất việc ghi các đoạn vào luồng. Nội dung phải làm mọi việc cần thiết để hoàn tất quá trình ghi vào bồn lưu trữ cơ bản và giải phóng quyền truy cập vào bồn lưu trữ đó. Nếu không đồng bộ, quá trình này có thể trả về một lời hứa để báo hiệu trạng thái thành công hoặc không thành công. Phương thức này sẽ chỉ được gọi sau khi tất cả các thao tác ghi vào hàng đợi đều thành công.
  • abort(reason): Phương thức này sẽ được gọi nếu ứng dụng báo hiệu rằng ứng dụng muốn đóng đột ngột luồng và đặt luồng đó ở trạng thái lỗi. Phương thức này có thể dọn dẹp mọi tài nguyên được giữ lại, giống như close(), nhưng abort() sẽ được gọi ngay cả khi các hoạt động ghi được đưa vào hàng đợi. Các khối đó sẽ bị loại bỏ. Nếu không đồng bộ, quá trình này có thể trả về một lời hứa để báo hiệu trạng thái thành công hoặc không thành công. Thông số reason chứa DOMString mô tả lý do luồng bị huỷ.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Giao diện WritableStreamDefaultController của API Luồng đại diện cho một trình điều khiển cho phép kiểm soát trạng thái của WritableStream trong quá trình thiết lập, khi nhiều phần được gửi để ghi hoặc khi kết thúc quá trình ghi. Khi tạo WritableStream, bồn lưu trữ cơ bản sẽ được cấp một thực thể WritableStreamDefaultController tương ứng để thao tác. WritableStreamDefaultController chỉ có một phương thức: WritableStreamDefaultController.error(), khiến mọi lượt tương tác trong tương lai với luồng được liên kết đều gặp lỗi. WritableStreamDefaultController cũng hỗ trợ thuộc tính signal trả về một thực thể của AbortSignal, cho phép dừng thao tác WritableStream nếu cần.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Đối số thứ hai (cũng không bắt buộc) của hàm khởi tạo WritableStream()queuingStrategy. Đây là một đối tượng xác định chiến lược xếp hàng cho luồng (không bắt buộc), trong đó có hai tham số:

  • highWaterMark: Một số không âm cho biết điểm cao của luồng bằng cách sử dụng chiến lược xếp hàng này.
  • size(chunk): Hàm tính toán và trả về kích thước hữu hạn không âm của giá trị khối đã cho. Kết quả được dùng để xác định áp lực ngược, thể hiện thông qua thuộc tính WritableStreamDefaultWriter.desiredSize thích hợp.

Phương thức getWriter()write()

Để ghi vào luồng có thể ghi, bạn cần có một trình ghi, đó sẽ là WritableStreamDefaultWriter. Phương thức getWriter() của giao diện WritableStream trả về một thực thể mới của WritableStreamDefaultWriter và khoá luồng vào thực thể đó. Khi luồng bị khoá, bạn không thể thu nạp trình ghi nào khác cho đến khi trình ghi hiện tại được phát hành.

Phương thức write() của giao diện WritableStreamDefaultWriter ghi một phần dữ liệu đã truyền vào WritableStream và bồn lưu trữ cơ bản của phần dữ liệu đó, sau đó trả về một lời hứa phân giải để cho biết thao tác ghi đã thành công hay không. Xin lưu ý rằng ý nghĩa của "thành công" là tuỳ thuộc vào bồn lưu trữ cơ bản; trạng thái này có thể cho biết rằng phần đã được chấp nhận và không nhất thiết là phần đó được lưu an toàn vào đích cuối cùng.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Thuộc tính locked

Bạn có thể kiểm tra xem một luồng có thể ghi có bị khoá hay không bằng cách truy cập vào thuộc tính WritableStream.locked của luồng đó.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Mẫu mã luồng có thể ghi

Mã mẫu bên dưới cho thấy tất cả các bước trong thực tế.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Chuyển luồng có thể đọc sang luồng có thể ghi

Bạn có thể chuyển luồng có thể đọc sang luồng có thể ghi thông qua phương thức pipeTo() của luồng có thể đọc. ReadableStream.pipeTo() chuyển ReadableStream hiện tại đến một WritableStream nhất định và trả về một lời hứa thực hiện khi quá trình chuyển hoàn tất thành công hoặc từ chối nếu gặp lỗi.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Tạo luồng biến đổi

Giao diện TransformStream của API Luồng dữ liệu đại diện cho một tập hợp dữ liệu có thể chuyển đổi. Bạn tạo luồng biến đổi bằng cách gọi hàm khởi tạo TransformStream(). Hàm này sẽ tạo và trả về đối tượng luồng biến đổi từ các trình xử lý đã cho. Hàm khởi tạo TransformStream() chấp nhận đối số đầu tiên là một đối tượng JavaScript không bắt buộc đại diện cho transformer. Các đối tượng như vậy có thể chứa bất kỳ phương thức nào sau đây:

transformer

  • start(controller): Phương thức này được gọi ngay khi đối tượng được tạo. Thông thường, phương thức này được dùng để thêm các đoạn tiền tố vào hàng đợi bằng controller.enqueue(). Các đoạn đó sẽ được đọc từ phía có thể đọc nhưng không phụ thuộc vào bất kỳ hoạt động ghi nào vào phía có thể ghi. Nếu quy trình ban đầu này không đồng bộ, ví dụ: vì cần một chút nỗ lực để thu nạp các đoạn tiền tố, hàm có thể trả về một lời hứa để báo hiệu thành công hoặc không thành công; lời hứa bị từ chối sẽ gây lỗi cho luồng. Mọi ngoại lệ được gửi sẽ được hàm khởi tạo TransformStream() gửi lại.
  • transform(chunk, controller): Phương thức này được gọi khi một đoạn mới ban đầu được ghi vào bên có thể ghi đã sẵn sàng để chuyển đổi. Việc triển khai luồng đảm bảo rằng hàm này chỉ được gọi sau khi các phép biến đổi trước đó thành công và không bao giờ được gọi trước khi start() hoàn tất hoặc sau khi flush() được gọi. Hàm này thực hiện công việc biến đổi thực tế của luồng biến đổi. Phương thức này có thể thêm kết quả vào hàng đợi bằng controller.enqueue(). Điều này cho phép một phần được ghi vào phía có thể ghi dẫn đến 0 hoặc nhiều phần trên phía có thể đọc, tuỳ thuộc vào số lần controller.enqueue() được gọi. Nếu quá trình chuyển đổi không đồng bộ, hàm này có thể trả về một lời hứa để báo hiệu quá trình chuyển đổi thành công hay không thành công. Lời hứa bị từ chối sẽ gây lỗi cho cả hai phía có thể đọc và ghi của luồng chuyển đổi. Nếu không cung cấp phương thức transform(), thì phép biến đổi nhận dạng sẽ được sử dụng, giúp thêm các đoạn không thay đổi từ phía có thể ghi vào hàng đợi phía có thể đọc.
  • flush(controller): Phương thức này được gọi sau khi tất cả các đoạn được ghi vào phía có thể ghi đã được chuyển đổi bằng cách truyền thành công qua transform() và phía có thể ghi sắp đóng. Thông thường, phương thức này được dùng để thêm các đoạn hậu tố vào hàng đợi cho phía có thể đọc, trước khi phía đó đóng lại. Nếu quá trình xả không đồng bộ, hàm có thể trả về một lời hứa để báo hiệu thành công hoặc không thành công; kết quả sẽ được thông báo cho phương thức gọi của stream.writable.write(). Ngoài ra, một lời hứa bị từ chối sẽ gây lỗi cho cả hai bên có thể đọc và ghi của luồng. Việc gửi một ngoại lệ được xử lý giống như việc trả về một lời hứa bị từ chối.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Chiến lược xếp hàng writableStrategyreadableStrategy

Các tham số không bắt buộc thứ hai và thứ ba của hàm khởi tạo TransformStream() là các chiến lược xếp hàng writableStrategyreadableStrategy không bắt buộc. Các dòng này được xác định như đã nêu trong phần luồng có thể đọccó thể ghi tương ứng.

Mẫu mã luồng biến đổi

Mã mẫu sau đây cho thấy một luồng biến đổi đơn giản đang hoạt động.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Chuyển luồng có thể đọc được qua luồng chuyển đổi

Phương thức pipeThrough() của giao diện ReadableStream cung cấp một cách thức có thể tạo chuỗi để chuyển luồng hiện tại thông qua luồng biến đổi hoặc bất kỳ cặp có thể ghi/đọc nào khác. Việc chuyển luồng thường sẽ khoá luồng đó trong suốt thời gian chuyển, ngăn các trình đọc khác khoá luồng đó.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Mã mẫu tiếp theo (hơi gượng ép) cho thấy cách bạn có thể triển khai phiên bản "gọi to" của fetch() để viết hoa tất cả văn bản bằng cách sử dụng lời hứa phản hồi được trả về dưới dạng luồng và viết hoa từng phần. Ưu điểm của phương pháp này là bạn không cần phải đợi tải toàn bộ tài liệu xuống. Điều này có thể tạo ra sự khác biệt lớn khi xử lý các tệp lớn.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Bản minh hoạ

Bản minh hoạ bên dưới cho thấy các luồng có thể đọc, ghi và biến đổi đang hoạt động. Tài liệu này cũng bao gồm các ví dụ về chuỗi ống pipeThrough()pipeTo(), đồng thời minh hoạ tee(). Bạn có thể tuỳ ý chạy bản minh hoạ trong cửa sổ riêng hoặc xem mã nguồn.

Các luồng hữu ích có trong trình duyệt

Có một số luồng hữu ích được tích hợp sẵn trong trình duyệt. Bạn có thể dễ dàng tạo một ReadableStream từ một blob. Phương thức stream() của giao diện Blob sẽ trả về một ReadableStream. Khi đọc, phương thức này sẽ trả về dữ liệu có trong blob. Ngoài ra, hãy nhớ rằng đối tượng File là một loại Blob cụ thể và có thể được sử dụng trong bất kỳ ngữ cảnh nào mà blob có thể sử dụng.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Các biến thể truyền trực tuyến của TextDecoder.decode()TextEncoder.encode() lần lượt được gọi là TextDecoderStreamTextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Bạn có thể dễ dàng nén hoặc giải nén tệp bằng các luồng chuyển đổi CompressionStreamDecompressionStream tương ứng. Mã mẫu bên dưới cho biết cách bạn có thể tải xuống thông số kỹ thuật của Luồng, nén (gzip) thông số kỹ thuật đó ngay trong trình duyệt và ghi trực tiếp tệp đã nén vào ổ đĩa.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream của File System Access API (API truy cập hệ thống tệp) và fetch() yêu cầu luồng thử nghiệm là ví dụ về các luồng có thể ghi trong thực tế.

Serial API (API nối tiếp) sử dụng nhiều cả luồng có thể đọc và có thể ghi.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Cuối cùng, API WebSocketStream tích hợp các luồng với API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Tài nguyên hữu ích

Lời cảm ơn

Bài viết này đã được Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe MedleyAdam Rice xem xét. Các bài đăng trên blog của Jake Archibald đã giúp tôi rất nhiều trong việc tìm hiểu về luồng. Một số mã mẫu được lấy cảm hứng từ nội dung khám phá của người dùng GitHub @bellbind và một số phần của nội dung này được xây dựng dựa trên Tài liệu web MDN về luồng. Các tác giả của Streams Standard đã làm rất tốt việc viết thông số kỹ thuật này. Hình ảnh chính của Ryan Lara trên Unsplash.