Luồng — Hướng dẫn chính thức

Tìm hiểu cách sử dụng các luồng có thể đọc, có thể ghi và có thể chuyển đổi bằng Streams API.

Streams API cho phép bạn truy cập theo phương thức có lập trình vào các luồng dữ liệu nhận được qua mạng hoặc được tạo bằng bất kỳ phương tiện nào tại địa phương và xử lý chúng bằng JavaScript. Truyền phát trực tiếp là quá trình phân chia một tài nguyên mà bạn muốn nhận, gửi hoặc chuyển đổi thành các khối nhỏ, sau đó xử lý từng khối một. Mặc dù phát trực tuyến là việc mà các trình duyệt vẫn làm khi nhận được các thành phần như HTML hoặc video sẽ xuất hiện trên các trang web, nhưng trước khi fetch có các luồng được giới thiệu vào năm 2015, JavaScript chưa bao giờ có khả năng này.

Trước đây, nếu muốn xử lý một loại tài nguyên nào đó (có thể là video, tệp văn bản, v.v.), bạn sẽ phải tải toàn bộ tệp xuống, đợi tệp đó được chuyển đổi tuần tự thành một định dạng phù hợp rồi mới xử lý. Với các luồng có sẵn cho JavaScript, mọi thứ sẽ thay đổi. Giờ đây, bạn có thể xử lý dữ liệu thô bằng JavaScript một cách liên tục ngay khi dữ liệu có sẵn trên ứng dụng, mà không cần tạo vùng đệm, chuỗi hoặc blob. Điều này mở ra một số trường hợp sử dụng, một số trường hợp được liệt kê dưới đây:

  • Hiệu ứng video: truyền một luồng video có thể đọc được thông qua một luồng biến đổi áp dụng hiệu ứng theo thời gian thực.
  • (Giải) nén dữ liệu: truyền một luồng tệp qua một luồng biến đổi để chọn lọc (giải) nén tệp đó.
  • Giải mã hình ảnh: truyền một luồng phản hồi HTTP qua một luồng biến đổi giúp giải mã các byte thành dữ liệu bitmap, sau đó truyền qua một luồng biến đổi khác giúp chuyển đổi bitmap thành PNG. Nếu được cài đặt bên trong trình xử lý fetch của một trình chạy dịch vụ, thì điều này cho phép bạn polyfill trong suốt các định dạng hình ảnh mới như AVIF.

Hỗ trợ trình duyệt

ReadableStream và WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

Các khái niệm cốt lõi

Trước khi đi vào chi tiết về các loại luồng phát, tôi xin giới thiệu một số khái niệm cốt lõi.

Khối

Đoạn là một phần dữ liệu được ghi vào hoặc đọc từ một luồng. Đó có thể là bất kỳ loại nào; các luồng thậm chí có thể chứa các khối thuộc nhiều loại. Hầu hết thời gian, một đoạn sẽ không phải là đơn vị dữ liệu nguyên tử nhất cho một luồng dữ liệu nhất định. Ví dụ: một luồng byte có thể chứa các khối bao gồm 16 đơn vị KiB Uint8Array, thay vì các byte đơn lẻ.

Luồng có thể đọc

Luồng có thể đọc đại diện cho một nguồn dữ liệu mà bạn có thể đọc. Nói cách khác, dữ liệu xuất hiện trong một luồng có thể đọc. Cụ thể, luồng có thể đọc là một thực thể của lớp ReadableStream.

Luồng có thể ghi

Luồng có thể ghi đại diện cho đích đến của dữ liệu mà bạn có thể ghi vào. Nói cách khác, dữ liệu đi vào một luồng có thể ghi. Cụ thể, luồng có thể ghi là một thực thể của lớp WritableStream.

Biến đổi luồng

Luồng biến đổi bao gồm một cặp luồng: một luồng có thể ghi (còn gọi là phía có thể ghi) và một luồng có thể đọc (còn gọi là phía có thể đọc). Một phép ẩn dụ ngoài đời thực cho việc này sẽ là một phiên dịch viên song song, người dịch từ ngôn ngữ này sang ngôn ngữ khác ngay lập tức. Theo cách dành riêng cho luồng biến đổi, việc ghi vào phía có thể ghi sẽ dẫn đến việc dữ liệu mới có sẵn để đọc từ phía có thể đọc. Cụ thể, mọi đối tượng có thuộc tính writable và thuộc tính readable đều có thể đóng vai trò là luồng biến đổi. Tuy nhiên, lớp TransformStream tiêu chuẩn giúp bạn dễ dàng tạo một cặp như vậy được liên kết đúng cách.

Xích ống

Các luồng chủ yếu được dùng bằng cách truyền chúng cho nhau. Bạn có thể truyền trực tiếp một luồng có thể đọc đến một luồng có thể ghi bằng phương thức pipeTo() của luồng có thể đọc hoặc truyền qua một hoặc nhiều luồng biến đổi trước bằng phương thức pipeThrough() của luồng có thể đọc. Một tập hợp các luồng được truyền cùng nhau theo cách này được gọi là chuỗi truyền dữ liệu.

Áp suất ngược

Sau khi được tạo, chuỗi đường ống sẽ truyền tín hiệu về tốc độ truyền các khối dữ liệu qua chuỗi đó. Nếu bất kỳ bước nào trong chuỗi chưa thể chấp nhận các khối, thì bước đó sẽ truyền một tín hiệu ngược lại thông qua chuỗi ống, cho đến khi nguồn ban đầu được yêu cầu ngừng tạo các khối quá nhanh. Quá trình chuẩn hoá luồng này được gọi là áp suất ngược.

Teeing

Bạn có thể chia luồng có thể đọc (được đặt tên theo hình dạng của chữ "T" viết hoa) bằng phương thức tee(). Thao tác này sẽ khoá luồng, tức là không thể sử dụng trực tiếp nữa; tuy nhiên, thao tác này sẽ tạo hai luồng mới, được gọi là nhánh, có thể sử dụng độc lập. Việc phân nhánh cũng rất quan trọng vì các luồng không thể tua lại hoặc khởi động lại. Chúng ta sẽ tìm hiểu thêm về vấn đề này sau.

Sơ đồ về một chuỗi ống bao gồm một luồng có thể đọc được đến từ một lệnh gọi đến API tìm nạp, sau đó được truyền qua một luồng biến đổi có đầu ra được chia thành hai và sau đó được gửi đến trình duyệt cho luồng có thể đọc được đầu tiên và đến bộ nhớ đệm của worker dịch vụ cho luồng có thể đọc được thứ hai.
Một chuỗi đường ống.

Cơ chế của luồng có thể đọc

Luồng có thể đọc là một nguồn dữ liệu được biểu thị bằng đối tượng ReadableStream trong JavaScript, luồng này bắt nguồn từ một nguồn cơ bản. Hàm khởi tạo ReadableStream() sẽ tạo và trả về một đối tượng luồng có thể đọc từ các trình xử lý đã cho. Có hai loại nguồn cơ bản:

  • Nguồn đẩy liên tục đẩy dữ liệu cho bạn khi bạn đã truy cập vào các nguồn đó. Bạn có thể bắt đầu, tạm dừng hoặc huỷ quyền truy cập vào luồng dữ liệu. Ví dụ: luồng video trực tiếp, sự kiện do máy chủ gửi hoặc WebSockets.
  • Nguồn kéo yêu cầu bạn phải yêu cầu dữ liệu một cách rõ ràng từ các nguồn này sau khi kết nối. Ví dụ: các thao tác HTTP thông qua lệnh gọi fetch() hoặc XMLHttpRequest.

Dữ liệu truyền phát được đọc tuần tự theo các phần nhỏ gọi là đoạn. Các đoạn được đặt trong một luồng được gọi là được xếp vào hàng đợi. Điều này có nghĩa là các thông báo đó đang chờ trong hàng đợi để được đọc. Một hàng đợi nội bộ theo dõi các khối chưa được đọc.

Chiến lược xếp hàng là một đối tượng xác định cách một luồng phải báo hiệu áp lực ngược dựa trên trạng thái của hàng đợi nội bộ. Chiến lược xếp hàng sẽ chỉ định một kích thước cho mỗi khối và so sánh tổng kích thước của tất cả các khối trong hàng đợi với một số được chỉ định, được gọi là dấu hiệu mực nước cao.

Các khối bên trong luồng được trình đọc đọc. Trình đọc này truy xuất dữ liệu từng khối một, cho phép bạn thực hiện bất kỳ loại thao tác nào bạn muốn trên dữ liệu đó. Trình đọc cùng với mã xử lý khác đi kèm được gọi là người dùng.

Cấu trúc tiếp theo trong ngữ cảnh này được gọi là bộ điều khiển. Mỗi luồng có thể đọc đều có một bộ điều khiển liên kết. Như tên gọi cho thấy, bộ điều khiển này cho phép bạn kiểm soát luồng.

Mỗi lần chỉ có một trình đọc có thể đọc một luồng; khi một trình đọc được tạo và bắt đầu đọc một luồng (tức là trở thành một trình đọc đang hoạt động), thì luồng đó sẽ bị khoá đối với trình đọc đó. Nếu muốn một trình đọc khác tiếp quản việc đọc luồng dữ liệu, bạn thường cần phát hành trình đọc đầu tiên trước khi làm bất cứ việc gì khác (mặc dù bạn có thể tee luồng dữ liệu).

Tạo luồng có thể đọc

Bạn tạo một luồng có thể đọc bằng cách gọi hàm khởi tạo ReadableStream(). Hàm khởi tạo có một đối số không bắt buộc là underlyingSource, đại diện cho một đối tượng có các phương thức và thuộc tính xác định cách hoạt động của thực thể luồng được tạo.

underlyingSource

Thao tác này có thể sử dụng các phương thức không bắt buộc sau do nhà phát triển xác định:

  • start(controller): Được gọi ngay khi đối tượng được tạo. Phương thức này có thể truy cập vào nguồn luồng và thực hiện mọi thao tác khác cần thiết để thiết lập chức năng luồng. Nếu quy trình này được thực hiện không đồng bộ, phương thức có thể trả về một lời hứa để báo hiệu thành công hoặc thất bại. Tham số controller được truyền đến phương thức này là ReadableStreamDefaultController.
  • pull(controller): Có thể dùng để kiểm soát luồng khi nhiều đoạn được tìm nạp hơn. Hàm này được gọi nhiều lần miễn là hàng đợi nội bộ của luồng không đầy, cho đến khi hàng đợi đạt đến ngưỡng cao. Nếu kết quả của việc gọi pull() là một promise, thì pull() sẽ không được gọi lại cho đến khi promise đó hoàn tất. Nếu lệnh hứa từ chối, luồng sẽ gặp lỗi.
  • cancel(reason): Sẽ gọi khi người dùng truyền phát huỷ luồng truyền phát.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController hỗ trợ các phương thức sau:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Đối số thứ hai (cũng không bắt buộc) của hàm khởi tạo ReadableStream()queuingStrategy. Đây là một đối tượng không bắt buộc xác định chiến lược xếp hàng cho luồng, nhận 2 tham số:

  • highWaterMark: Một số không âm cho biết dấu hiệu mực nước cao của luồng bằng chiến lược xếp hàng này.
  • size(chunk): Một hàm tính toán và trả về kích thước hữu hạn không âm của giá trị khối đã cho. Kết quả này được dùng để xác định áp suất ngược, biểu thị thông qua thuộc tính ReadableStreamDefaultController.desiredSize thích hợp. Nó cũng chi phối thời điểm phương thức pull() của nguồn cơ bản được gọi.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Các phương thức getReader()read()

Để đọc từ một luồng có thể đọc, bạn cần có một trình đọc, đó sẽ là ReadableStreamDefaultReader. Phương thức getReader() của giao diện ReadableStream sẽ tạo một trình đọc và khoá luồng vào đó. Khi luồng bị khoá, bạn không thể lấy được trình đọc nào khác cho đến khi trình đọc này được giải phóng.

Phương thức read() của giao diện ReadableStreamDefaultReader trả về một lời hứa cung cấp quyền truy cập vào khối tiếp theo trong hàng đợi nội bộ của luồng. Nó sẽ hoàn tất hoặc từ chối bằng một kết quả tuỳ thuộc vào trạng thái của luồng. Sau đây là các trường hợp có thể xảy ra:

  • Nếu có một đoạn, thì promise sẽ được thực hiện với một đối tượng thuộc biểu mẫu
    { value: chunk, done: false }.
  • Nếu luồng bị đóng, thì promise sẽ được thực hiện với một đối tượng có dạng
    { value: undefined, done: true }.
  • Nếu luồng gặp lỗi, thì lời hứa sẽ bị từ chối kèm theo lỗi liên quan.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Thuộc tính locked

Bạn có thể kiểm tra xem luồng có thể đọc có bị khoá hay không bằng cách truy cập vào thuộc tính ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Mã mẫu luồng có thể đọc

Mã mẫu bên dưới cho thấy tất cả các bước đang hoạt động. Trước tiên, bạn tạo một ReadableStream trong đối số underlyingSource (tức là lớp TimestampSource) xác định một phương thức start(). Phương thức này yêu cầu controller của luồng enqueue() dấu thời gian mỗi giây trong 10 giây. Cuối cùng, nó yêu cầu trình điều khiển close() luồng. Bạn sử dụng luồng này bằng cách tạo một trình đọc thông qua phương thức getReader() và gọi read() cho đến khi luồng là done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Vòng lặp không đồng bộ

Kiểm tra trong mỗi lần lặp vòng lặp read() xem luồng có phải là done hay không có thể không phải là API thuận tiện nhất. Rất may là sắp tới sẽ có một cách tốt hơn để thực hiện việc này: vòng lặp không đồng bộ.

for await (const chunk of stream) {
  console.log(chunk);
}

Một giải pháp tạm thời để sử dụng vòng lặp không đồng bộ ngay hôm nay là triển khai hành vi bằng một polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Tạo bản sao của luồng có thể đọc

Phương thức tee() của giao diện ReadableStream sẽ phân chia luồng có thể đọc hiện tại, trả về một mảng gồm hai phần tử chứa hai nhánh kết quả dưới dạng các thực thể ReadableStream mới. Điều này cho phép hai trình đọc đọc một luồng dữ liệu cùng lúc. Ví dụ: bạn có thể thực hiện việc này trong một trình chạy dịch vụ nếu muốn tìm nạp một phản hồi từ máy chủ và truyền trực tuyến phản hồi đó đến trình duyệt, nhưng cũng truyền trực tuyến phản hồi đó đến bộ nhớ đệm của trình chạy dịch vụ. Vì nội dung phản hồi không thể được sử dụng nhiều lần, nên bạn cần 2 bản sao để thực hiện việc này. Để huỷ luồng, bạn cần huỷ cả hai nhánh kết quả. Việc phân nhánh một luồng thường sẽ khoá luồng đó trong khoảng thời gian nhất định, ngăn những trình đọc khác khoá luồng đó.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Luồng byte có thể đọc

Đối với các luồng đại diện cho byte, một phiên bản mở rộng của luồng có thể đọc được sẽ được cung cấp để xử lý byte một cách hiệu quả, đặc biệt là bằng cách giảm thiểu số lượng bản sao. Luồng byte cho phép thu nạp những trình đọc mang theo bộ đệm riêng (BYOB). Quá trình triển khai mặc định có thể đưa ra nhiều đầu ra khác nhau, chẳng hạn như chuỗi hoặc vùng đệm mảng trong trường hợp WebSockets, trong khi luồng byte đảm bảo đầu ra byte. Ngoài ra, trình đọc BYOB còn có những lợi ích về độ ổn định. Điều này là do nếu một vùng đệm tách ra, thì vùng đệm đó có thể đảm bảo rằng một vùng đệm không ghi vào cùng một vùng đệm hai lần, do đó tránh được tình trạng xung đột dữ liệu. Các trình đọc BYOB có thể giảm số lần trình duyệt cần chạy quy trình thu thập rác vì có thể dùng lại các vùng đệm.

Tạo luồng byte có thể đọc

Bạn có thể tạo một luồng byte có thể đọc bằng cách truyền thêm một tham số type vào hàm khởi tạo ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Nguồn cơ bản của luồng byte có thể đọc được sẽ được cung cấp một ReadableByteStreamController để thao tác. Phương thức ReadableByteStreamController.enqueue() của lớp này nhận một đối số chunk có giá trị là ArrayBufferView. Thuộc tính ReadableByteStreamController.byobRequest trả về yêu cầu kéo BYOB hiện tại hoặc giá trị rỗng nếu không có yêu cầu nào. Cuối cùng, thuộc tính ReadableByteStreamController.desiredSize sẽ trả về kích thước mong muốn để lấp đầy hàng đợi nội bộ của luồng được kiểm soát.

queuingStrategy

Đối số thứ hai (cũng không bắt buộc) của hàm khởi tạo ReadableStream()queuingStrategy. Đây là một đối tượng có thể xác định chiến lược xếp hàng cho luồng, nhận một tham số:

  • highWaterMark: Số byte không âm cho biết dấu hiệu mực nước cao của luồng bằng chiến lược xếp hàng này. Điều này được dùng để xác định áp suất ngược, biểu thị thông qua thuộc tính ReadableByteStreamController.desiredSize thích hợp. Nó cũng chi phối thời điểm phương thức pull() của nguồn cơ bản được gọi.

Các phương thức getReader()read()

Sau đó, bạn có thể truy cập vào một ReadableStreamBYOBReader bằng cách đặt tham số mode cho phù hợp: ReadableStream.getReader({ mode: "byob" }). Điều này giúp kiểm soát chính xác hơn việc phân bổ vùng đệm để tránh các bản sao. Để đọc từ luồng byte, bạn cần gọi ReadableStreamBYOBReader.read(view), trong đó viewArrayBufferView.

Đoạn mã mẫu luồng byte có thể đọc

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Hàm sau đây trả về các luồng byte có thể đọc được, cho phép đọc hiệu quả mà không cần sao chép một mảng được tạo ngẫu nhiên. Thay vì sử dụng kích thước khối được xác định trước là 1.024, phương thức này sẽ cố gắng điền vào vùng đệm do nhà phát triển cung cấp, cho phép kiểm soát hoàn toàn.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Cơ chế của luồng có thể ghi

Luồng có thể ghi là đích đến mà bạn có thể ghi dữ liệu, được biểu thị trong JavaScript bằng đối tượng WritableStream. Đây đóng vai trò là một lớp trừu tượng ở trên cùng của một sink cơ bản – một sink I/O cấp thấp hơn mà dữ liệu thô được ghi vào.

Dữ liệu được ghi vào luồng thông qua một trình ghi, mỗi lần một khối. Một đoạn có thể có nhiều hình thức, giống như các đoạn trong một tệp đọc. Bạn có thể sử dụng bất kỳ mã nào bạn muốn để tạo ra các đoạn mã sẵn sàng để ghi; người viết cộng với mã liên kết được gọi là nhà sản xuất.

Khi được tạo và bắt đầu ghi vào một luồng (trình ghi đang hoạt động), trình ghi sẽ được coi là bị khoá vào luồng đó. Mỗi lần, chỉ có một trình ghi có thể ghi vào luồng có thể ghi. Nếu muốn một trình ghi khác bắt đầu ghi vào luồng của bạn, thì bạn thường cần phát hành luồng đó, sau đó đính kèm một trình ghi khác vào luồng.

Một hàng đợi nội bộ theo dõi các khối đã được ghi vào luồng nhưng chưa được xử lý bởi đích nhận cơ bản.

Chiến lược xếp hàng là một đối tượng xác định cách một luồng phải báo hiệu áp suất ngược dựa trên trạng thái của hàng đợi nội bộ. Chiến lược xếp hàng sẽ chỉ định một kích thước cho mỗi khối và so sánh tổng kích thước của tất cả các khối trong hàng đợi với một số được chỉ định, được gọi là dấu hiệu mực nước cao.

Cấu trúc cuối cùng được gọi là bộ điều khiển. Mỗi luồng có thể ghi đều có một bộ điều khiển liên kết cho phép bạn kiểm soát luồng (ví dụ: để huỷ luồng).

Tạo luồng có thể ghi

Giao diện WritableStream của Streams API cung cấp một lớp trừu tượng tiêu chuẩn để ghi dữ liệu truyền trực tuyến vào một đích đến, còn được gọi là nguồn nhận. Đối tượng này có sẵn tính năng điều chỉnh tốc độ và xếp hàng. Bạn tạo một luồng có thể ghi bằng cách gọi hàm khởi tạo WritableStream(). Nó có một tham số underlyingSink không bắt buộc, đại diện cho một đối tượng có các phương thức và thuộc tính xác định cách hoạt động của phiên bản luồng được tạo.

underlyingSink

underlyingSink có thể bao gồm các phương thức không bắt buộc sau đây do nhà phát triển xác định. Tham số controller được truyền đến một số phương thức là WritableStreamDefaultController.

  • start(controller): Phương thức này được gọi ngay khi đối tượng được tạo. Nội dung của phương thức này phải nhằm mục đích truy cập vào đích nhận cơ bản. Nếu quy trình này được thực hiện không đồng bộ, thì quy trình này có thể trả về một lời hứa để báo hiệu trạng thái thành công hoặc không thành công.
  • write(chunk, controller): Phương thức này sẽ được gọi khi một khối dữ liệu mới (được chỉ định trong tham số chunk) đã sẵn sàng để được ghi vào nguồn nhận cơ bản. Phương thức này có thể trả về một lời hứa để báo hiệu trạng thái thành công hoặc không thành công của thao tác ghi. Phương thức này sẽ chỉ được gọi sau khi các thao tác ghi trước đó thành công và không bao giờ được gọi sau khi luồng bị đóng hoặc huỷ.
  • close(controller): Phương thức này sẽ được gọi nếu ứng dụng báo hiệu rằng ứng dụng đã ghi xong các khối vào luồng. Nội dung phải làm mọi thứ cần thiết để hoàn tất việc ghi vào nguồn nhận dữ liệu cơ bản và giải phóng quyền truy cập vào nguồn đó. Nếu quy trình này không đồng bộ, thì quy trình này có thể trả về một lời hứa để báo hiệu trạng thái thành công hoặc không thành công. Phương thức này sẽ chỉ được gọi sau khi tất cả các thao tác ghi được xếp hàng đã thành công.
  • abort(reason): Phương thức này sẽ được gọi nếu ứng dụng báo hiệu rằng ứng dụng muốn đóng đột ngột luồng và đặt luồng ở trạng thái có lỗi. Phương thức này có thể dọn dẹp mọi tài nguyên được giữ lại, tương tự như close(), nhưng abort() sẽ được gọi ngay cả khi các thao tác ghi được xếp hàng. Những đoạn này sẽ bị loại bỏ. Nếu quy trình này là không đồng bộ, thì quy trình này có thể trả về một lời hứa để báo hiệu trạng thái thành công hoặc không thành công. Tham số reason chứa một DOMString mô tả lý do luồng bị huỷ.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Giao diện WritableStreamDefaultController của Streams API đại diện cho một trình điều khiển cho phép kiểm soát trạng thái của WritableStream trong quá trình thiết lập, khi nhiều khối được gửi để ghi hoặc khi kết thúc quá trình ghi. Khi tạo một WritableStream, đích nhận cơ bản sẽ được cung cấp một thực thể WritableStreamDefaultController tương ứng để thao tác. WritableStreamDefaultController chỉ có một phương thức: WritableStreamDefaultController.error(), khiến mọi lượt tương tác trong tương lai với luồng liên kết đều gặp lỗi. WritableStreamDefaultController cũng hỗ trợ thuộc tính signal trả về một thực thể của AbortSignal, cho phép dừng thao tác WritableStream nếu cần.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Đối số thứ hai (cũng không bắt buộc) của hàm khởi tạo WritableStream()queuingStrategy. Đây là một đối tượng không bắt buộc xác định chiến lược xếp hàng cho luồng, nhận 2 tham số:

  • highWaterMark: Một số không âm cho biết dấu hiệu mực nước cao của luồng bằng chiến lược xếp hàng này.
  • size(chunk): Một hàm tính toán và trả về kích thước hữu hạn không âm của giá trị khối đã cho. Kết quả này được dùng để xác định áp suất ngược, biểu thị thông qua thuộc tính WritableStreamDefaultWriter.desiredSize thích hợp.

Các phương thức getWriter()write()

Để ghi vào một luồng có thể ghi, bạn cần có một trình ghi. Trình ghi này sẽ là WritableStreamDefaultWriter. Phương thức getWriter() của giao diện WritableStream sẽ trả về một phiên bản mới của WritableStreamDefaultWriter và khoá luồng vào phiên bản đó. Trong khi luồng bị khoá, không thể lấy được trình ghi nào khác cho đến khi trình ghi hiện tại được giải phóng.

Phương thức write() của giao diện WritableStreamDefaultWriter ghi một đoạn dữ liệu đã truyền vào WritableStream và đích nhận cơ bản của nó, sau đó trả về một lời hứa phân giải để cho biết thao tác ghi thành công hay thất bại. Xin lưu ý rằng ý nghĩa của "thành công" tuỳ thuộc vào đích nhận cơ bản; điều này có thể cho biết rằng khối đã được chấp nhận, chứ không nhất thiết là khối đó đã được lưu an toàn vào đích đến cuối cùng.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Thuộc tính locked

Bạn có thể kiểm tra xem một luồng có thể ghi có bị khoá hay không bằng cách truy cập vào thuộc tính WritableStream.locked của luồng đó.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Đoạn mã mẫu luồng có thể ghi

Mã mẫu bên dưới cho thấy tất cả các bước đang hoạt động.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Truyền một luồng có thể đọc đến một luồng có thể ghi

Bạn có thể truyền một luồng có thể đọc đến một luồng có thể ghi thông qua phương thức pipeTo() của luồng có thể đọc. ReadableStream.pipeTo() chuyển ReadableStream hiện tại đến một WritableStream nhất định và trả về một lời hứa hoàn thành khi quy trình chuyển hoàn tất thành công hoặc từ chối nếu gặp phải lỗi.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Tạo luồng biến đổi

Giao diện TransformStream của Streams API đại diện cho một tập hợp dữ liệu có thể biến đổi. Bạn tạo một luồng biến đổi bằng cách gọi hàm khởi tạo TransformStream(). Hàm này sẽ tạo và trả về một đối tượng luồng biến đổi từ các trình xử lý đã cho. Hàm dựng TransformStream() chấp nhận đối số đầu tiên là một đối tượng JavaScript không bắt buộc, đại diện cho transformer. Các đối tượng như vậy có thể chứa bất kỳ phương thức nào sau đây:

transformer

  • start(controller): Phương thức này được gọi ngay khi đối tượng được tạo. Thông thường, bạn sẽ dùng phương thức này để xếp hàng các khối tiền tố bằng cách sử dụng controller.enqueue(). Những khối này sẽ được đọc từ phía có thể đọc nhưng không phụ thuộc vào bất kỳ hoạt động ghi nào vào phía có thể ghi. Nếu quy trình ban đầu này là không đồng bộ, chẳng hạn như vì cần một số nỗ lực để có được các khối tiền tố, thì hàm có thể trả về một lời hứa để báo hiệu trạng thái thành công hoặc không thành công; một lời hứa bị từ chối sẽ báo lỗi cho luồng. Mọi ngoại lệ đã được tạo sẽ được hàm khởi tạo TransformStream() tạo lại.
  • transform(chunk, controller): Phương thức này được gọi khi một khối mới được ghi vào phía có thể ghi đã sẵn sàng để được chuyển đổi. Việc triển khai luồng đảm bảo rằng hàm này sẽ chỉ được gọi sau khi các biến đổi trước đó thành công và không bao giờ trước khi start() hoàn tất hoặc sau khi flush() được gọi. Hàm này thực hiện công việc biến đổi thực tế của luồng biến đổi. Bạn có thể xếp hàng các kết quả bằng cách sử dụng controller.enqueue(). Điều này cho phép một khối duy nhất được ghi vào phía có thể ghi để tạo ra 0 hoặc nhiều khối ở phía có thể đọc, tuỳ thuộc vào số lần controller.enqueue() được gọi. Nếu quá trình chuyển đổi là không đồng bộ, hàm này có thể trả về một lời hứa để báo hiệu thành công hoặc thất bại của quá trình chuyển đổi. Một lời hứa bị từ chối sẽ gây ra lỗi cho cả hai phía có thể đọc và ghi của luồng biến đổi. Nếu không có phương thức transform() nào được cung cấp, thì phép biến đổi danh tính sẽ được dùng. Phép biến đổi này sẽ xếp các khối không thay đổi từ phía có thể ghi vào phía có thể đọc.
  • flush(controller): Phương thức này được gọi sau khi tất cả các khối được ghi vào phía có thể ghi đã được chuyển đổi bằng cách truyền thành công qua transform() và phía có thể ghi sắp đóng. Thông thường, thao tác này được dùng để xếp hàng các khối hậu tố vào phía có thể đọc, trước khi phía đó cũng đóng. Nếu quy trình xoá là không đồng bộ, hàm có thể trả về một lời hứa để báo hiệu thành công hoặc thất bại; kết quả sẽ được truyền đạt cho phương thức gọi của stream.writable.write(). Ngoài ra, một lời hứa bị từ chối sẽ gây ra lỗi cho cả hai phía có thể đọc và ghi của luồng. Việc tạo một ngoại lệ được coi như việc trả về một lời hứa bị từ chối.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Chiến lược xếp hàng writableStrategyreadableStrategy

Tham số không bắt buộc thứ hai và thứ ba của hàm khởi tạo TransformStream() là các chiến lược xếp hàng writableStrategyreadableStrategy không bắt buộc. Chúng được xác định như mô tả trong các phần luồng có thể đọccó thể ghi tương ứng.

Mẫu mã luồng biến đổi

Mã mẫu sau đây minh hoạ một luồng biến đổi đơn giản đang hoạt động.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Truyền một luồng có thể đọc qua một luồng biến đổi

Phương thức pipeThrough() của giao diện ReadableStream cung cấp một cách có thể liên kết để truyền luồng hiện tại thông qua một luồng biến đổi hoặc bất kỳ cặp có thể ghi/đọc nào khác. Việc truyền một luồng thường sẽ khoá luồng đó trong thời gian truyền, ngăn các trình đọc khác khoá luồng.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Mẫu mã tiếp theo (hơi giả tạo) cho biết cách bạn có thể triển khai phiên bản "hét" của fetch(), trong đó viết hoa tất cả văn bản bằng cách sử dụng lời hứa phản hồi được trả về dưới dạng một luồng và viết hoa từng khối. Ưu điểm của phương pháp này là bạn không cần đợi tải toàn bộ tài liệu xuống. Điều này có thể tạo ra sự khác biệt lớn khi xử lý các tệp lớn.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Bản minh hoạ

Bản minh hoạ bên dưới cho thấy các luồng có thể đọc, ghi và biến đổi đang hoạt động. Tài liệu này cũng bao gồm các ví dụ về chuỗi lệnh pipeThrough()pipeTo(), đồng thời minh hoạ tee(). Bạn có thể chạy bản minh hoạ trong cửa sổ riêng hoặc xem mã nguồn (không bắt buộc).

Các luồng hữu ích có trong trình duyệt

Có một số luồng hữu ích được tích hợp ngay trong trình duyệt. Bạn có thể dễ dàng tạo một ReadableStream từ một blob. Phương thức stream() của giao diện Blob sẽ trả về một ReadableStream. Khi đọc, ReadableStream này sẽ trả về dữ liệu có trong blob. Ngoài ra, hãy nhớ rằng đối tượng File là một loại cụ thể của Blob và có thể được dùng trong mọi ngữ cảnh mà một blob có thể.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Các biến thể truyền phát trực tiếp của TextDecoder.decode()TextEncoder.encode() lần lượt được gọi là TextDecoderStreamTextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Bạn có thể dễ dàng nén hoặc giải nén tệp bằng các luồng biến đổi CompressionStreamDecompressionStream tương ứng. Mã mẫu dưới đây cho thấy cách bạn có thể tải Streams spec xuống, nén (gzip) ngay trong trình duyệt và ghi tệp nén trực tiếp vào đĩa.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API (API Quyền truy cập vào hệ thống tệp) FileSystemWritableFileStreamfetch() luồng yêu cầu thử nghiệm là các ví dụ về luồng có thể ghi trong tự nhiên.

Serial API sử dụng nhiều cả luồng có thể đọc và luồng có thể ghi.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Cuối cùng, API WebSocketStream tích hợp các luồng với WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Tài nguyên hữu ích

Lời cảm ơn

Bài viết này được Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe MedleyAdam Rice xem xét. Các bài đăng trên blog của Jake Archibald đã giúp tôi rất nhiều trong việc tìm hiểu về luồng. Một số mã mẫu được lấy cảm hứng từ các khám phá của người dùng GitHub @bellbind và các phần của văn bản được xây dựng dựa trên Tài liệu web MDN về luồng. Các tác giả của Tiêu chuẩn về luồng đã hoàn thành xuất sắc việc viết bản đặc tả này. Hình ảnh chính của Ryan Lara trên Unsplash.