Streams - 최종 가이드

Streams API를 사용해 읽기, 쓰기, 변환 스트림을 사용하는 방법을 알아보세요.

Streams API를 사용하면 네트워크를 통해 수신되거나 로컬에서 생성된 데이터 스트림에 프로그래매틱 방식으로 액세스하여 JavaScript로 처리할 수 있습니다. 스트리밍에는 수신, 전송 또는 변환하려는 리소스를 작은 청크로 분해한 후 이 청크를 조금씩 처리하는 작업이 포함됩니다. 스트리밍은 브라우저에서 웹페이지에 표시할 HTML 또는 동영상과 같은 애셋을 수신할 때 하는 일이지만 2015년에 스트림을 포함하는 fetch가 도입되기 전에는 JavaScript에서 이 기능을 사용할 수 없었습니다.

이전에는 동영상 또는 텍스트 파일 등의 일종의 리소스를 처리하려면 전체 파일을 다운로드하고 적절한 형식으로 역직렬화될 때까지 기다린 후 처리해야 했습니다. 스트림을 JavaScript에서 사용할 수 있으므로 이 모든 것이 달라집니다. 이제 버퍼, 문자열 또는 blob을 생성할 필요 없이 클라이언트에서 사용할 수 있게 되는 즉시 JavaScript로 원시 데이터를 점진적으로 처리할 수 있습니다. 이를 통해 다양한 사용 사례를 활용할 수 있으며, 그중 일부는 아래에 나열되어 있습니다.

  • 동영상 효과: 실시간으로 효과를 적용하는 변환 스트림을 통해 읽을 수 있는 동영상 스트림을 파이핑합니다.
  • 데이터 (압축 해제): 파일을 선택적으로(해제) 압축하는 변환 스트림을 통해 파일 스트림을 파이핑합니다.
  • 이미지 디코딩: 바이트를 비트맵 데이터로 디코딩하는 변환 스트림을 통해 HTTP 응답 스트림을 파이핑한 후 비트맵을 PNG로 변환하는 다른 변환 스트림을 통해 파이핑합니다. 서비스 워커의 fetch 핸들러 내에 설치된 경우 AVIF와 같은 새로운 이미지 형식을 투명하게 폴리필할 수 있습니다.

브라우저 지원

ReadableStream 및 WritableStream

브라우저 지원

  • 43
  • 14
  • 65
  • 10.1

소스

TransformStream

브라우저 지원

  • 67
  • 79
  • 102
  • 14.1

소스

핵심 개념

다양한 스트림 유형을 자세히 알아보기 전에 몇 가지 핵심 개념을 소개하겠습니다.

덩어리

청크는 스트림에 쓰거나 스트림에서 읽히는 단일 데이터 조각입니다. 모든 유형이 될 수 있으며 스트림에 다양한 유형의 청크가 포함될 수도 있습니다. 대부분의 경우 청크는 특정 스트림에서 가장 원자적인 데이터 단위가 아닙니다. 예를 들어 한 바이트 스트림에 단일 바이트 대신 16KiB Uint8Array 단위로 구성된 청크가 포함될 수 있습니다.

읽을 수 있는 스트림

읽을 수 있는 스트림은 읽을 수 있는 데이터 소스를 나타냅니다. 즉, 데이터는 읽을 수 있는 스트림에서 나옵니다. 구체적으로 읽을 수 있는 스트림은 ReadableStream 클래스의 인스턴스입니다.

쓰기 가능한 스트림

쓰기 가능한 스트림은 데이터를 쓸 수 있는 대상을 나타냅니다. 즉, 데이터가 쓰기 가능한 스트림으로 들어갑니다. 구체적으로, 쓰기 가능한 스트림은 WritableStream 클래스의 인스턴스입니다.

스트림 변환

변환 스트림은 스트림 쌍, 즉 쓰기 가능한 스트림(쓰기 가능한 측이라고 함)과 읽을 수 있는 스트림(읽기 가능한 측이라고 함)으로 구성됩니다. 실제로 한 언어를 다른 언어로 즉시 번역하는 동시 통역이 있습니다. 변환 스트림에만 국한된 방식으로, 쓰기 가능한 측에 쓰기를 수행하면 읽을 수 있는 측에서 읽을 수 있는 새 데이터가 제공됩니다. 구체적으로 writable 속성과 readable 속성이 있는 모든 객체는 변환 스트림으로 사용할 수 있습니다. 그러나 표준 TransformStream 클래스를 사용하면 제대로 연결된 쌍을 더 쉽게 만들 수 있습니다.

파이프 체인

스트림은 주로 스트림을 서로 파이핑하는 데 사용됩니다. 읽을 수 있는 스트림은 읽을 수 있는 스트림의 pipeTo() 메서드를 사용하여 쓰기 가능한 스트림으로 직접 파이핑할 수 있습니다. 또는 읽을 수 있는 스트림의 pipeThrough() 메서드를 사용하여 먼저 하나 이상의 변환 스트림을 통해 파이핑할 수 있습니다. 이러한 방식으로 함께 파이핑된 스트림 집합을 파이프 체인이라고 부릅니다.

백프레셔

파이프 체인이 구축되면 청크가 통과하는 속도에 관한 신호를 전파합니다. 체인의 어느 단계에서도 청크를 수락할 수 없는 경우, 파이프 체인을 통해 신호를 역방향으로 전파하며, 결국에는 원래 소스에 청크 생성을 너무 빨리 중지하라는 지시가 전달됩니다. 이 흐름 정규화 프로세스를 백프레셔라고 합니다.

티잉

읽을 수 있는 스트림은 tee() 메서드를 사용하여 티 (대문자 'T' 모양으로 이름 지정)를 지정할 수 있습니다. 이렇게 하면 스트림이 잠겨 더 이상 직접 사용할 수 없게 됩니다. 하지만 독립적으로 사용할 수 있는 브랜치라는 2개의 새 스트림이 생성됩니다. 스트림을 되감거나 다시 시작할 수 없으므로 티잉도 중요합니다. 이에 대해서는 나중에 자세히 설명합니다.

Import API에 대한 호출에서 발생하는 판독 가능한 스트림으로 구성된 파이프 체인의 다이어그램
파이프 체인.

읽을 수 있는 스트림의 메커니즘

읽을 수 있는 스트림은 기본 소스에서 발생하는 ReadableStream 객체로 JavaScript로 표현되는 데이터 소스입니다. ReadableStream() 생성자는 지정된 핸들러에서 읽을 수 있는 스트림 객체를 만들고 반환합니다. 기본 소스에는 두 가지 유형이 있습니다.

  • 푸시 소스는 사용자가 데이터에 액세스하면 지속적으로 데이터를 푸시하며, 스트림 액세스 권한을 시작, 일시중지 또는 취소하는 것은 사용자의 몫입니다. 실시간 동영상 스트림, 서버 전송 이벤트 또는 WebSocket을 예로 들 수 있습니다.
  • 가져오기 소스를 사용하려면 연결되었을 때 소스에서 데이터를 명시적으로 요청해야 합니다. fetch() 또는 XMLHttpRequest 호출을 통한 HTTP 작업을 예로 들 수 있습니다.

스트림 데이터는 청크라고 하는 작은 조각으로 순차적으로 읽힙니다. 스트림에 배치된 청크를 큐에 추가되었다고 합니다. 이는 읽을 준비가 된 큐에서 대기 중임을 의미합니다. 내부 큐는 아직 읽지 않은 청크를 추적합니다.

큐 추가 전략은 내부 큐의 상태에 따라 스트림이 백프레셔 신호를 보내는 방식을 결정하는 객체입니다. 큐 전략은 각 청크에 크기를 할당하고 큐에 있는 모든 청크의 총 크기를 지정된 수(하이 워터마크라고 함)와 비교합니다.

스트림 내부의 청크는 리더가 읽습니다. 이 판독기는 한 번에 한 청크 데이터를 검색하므로 원하는 작업 유형을 수행할 수 있습니다. 리더 및 이와 함께 제공되는 다른 처리 코드를 소비자라고 합니다.

이 컨텍스트의 다음 구조는 컨트롤러라고 합니다. 읽을 수 있는 각 스트림에는 이름에서 알 수 있듯이 스트림을 제어할 수 있는 연결된 컨트롤러가 있습니다.

한 번에 하나의 판독기만 스트림을 읽을 수 있습니다. 리더가 생성되어 스트림 읽기를 시작하면(즉, 활성 리더가 됨) 리더가 스트림에 잠겨집니다. 다른 리더가 스트림 읽기를 인계받도록 하려면 일반적으로 다른 작업을 하기 전에 첫 번째 리더를 해제해야 합니다(스트림을 티칭할 수 있음).

읽을 수 있는 스트림 만들기

생성자 ReadableStream()를 호출하여 읽을 수 있는 스트림을 만듭니다. 생성자에는 생성된 스트림 인스턴스의 작동 방식을 정의하는 메서드와 속성이 있는 객체를 나타내는 선택적 인수 underlyingSource가 있습니다.

underlyingSource

다음과 같은 개발자가 정의한 선택적 메서드를 사용할 수 있습니다.

  • start(controller): 객체가 생성될 때 즉시 호출됩니다. 이 메서드는 스트림 소스에 액세스하고 스트림 기능을 설정하는 데 필요한 다른 모든 작업을 할 수 있습니다. 이 프로세스를 비동기식으로 실행해야 하는 경우 메서드는 성공 또는 실패를 알리는 프로미스를 반환할 수 있습니다. 이 메서드에 전달되는 controller 매개변수는 ReadableStreamDefaultController입니다.
  • pull(controller): 더 많은 청크를 가져올 때 스트림을 제어하는 데 사용할 수 있습니다. 스트림의 내부 청크 큐가 가득 차지 않는 한 큐가 높은 워터마크에 도달할 때까지 반복적으로 호출됩니다. pull() 호출의 결과가 프로미스인 경우 해당 프로미스가 처리될 때까지 pull()는 다시 호출되지 않습니다. 프로미스가 거부되면 스트림에 오류가 발생합니다.
  • cancel(reason): 스트림 소비자가 스트림을 취소할 때 호출됩니다.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController는 다음 메서드를 지원합니다.

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

마찬가지로 선택사항인 ReadableStream() 생성자의 두 번째 인수는 queuingStrategy입니다. 이 객체는 스트림의 대기열 전략을 선택적으로 정의하는 객체로, 다음 두 가지 매개변수를 사용합니다.

  • highWaterMark: 이 대기열 전략을 사용하는 스트림의 높은 워터마크를 나타내는 음수가 아닌 숫자입니다.
  • size(chunk): 지정된 청크 값의 음수가 아닌 유한 크기를 계산하고 반환하는 함수입니다. 결과는 적절한 ReadableStreamDefaultController.desiredSize 속성을 통해 나타나는 백프레셔를 확인하는 데 사용됩니다. 또한 기본 소스의 pull() 메서드가 호출되는 시점을 제어합니다.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 메서드

읽을 수 있는 스트림에서 읽으려면 ReadableStreamDefaultReader인 리더가 필요합니다. ReadableStream 인터페이스의 getReader() 메서드는 리더를 만들고 이 리더에 스트림을 잠급니다. 스트림이 잠겨 있으면 이 리더가 해제될 때까지 다른 리더를 획득할 수 없습니다.

ReadableStreamDefaultReader 인터페이스의 read() 메서드는 스트림 내부 큐의 다음 청크에 대한 액세스를 제공하는 프로미스를 반환합니다. 스트림의 상태에 따라 결과를 처리하거나 거부합니다. 다른 가능성은 다음과 같습니다.

  • 청크를 사용할 수 있는 경우 프로미스는
    { value: chunk, done: false } 형식의 객체로 처리됩니다.
  • 스트림이 닫히면 프로미스가
    { value: undefined, done: true } 형식의 객체를 사용하여 처리됩니다.
  • 스트림에 오류가 발생하면 promise는 관련 오류와 함께 거부됩니다.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 속성

ReadableStream.locked 속성에 액세스하여 읽을 수 있는 스트림이 잠겨 있는지 확인할 수 있습니다.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

읽을 수 있는 스트림 코드 샘플

아래의 코드 샘플은 실제 작동 중인 모든 단계를 보여줍니다. 먼저 underlyingSource 인수 (즉, TimestampSource 클래스)에서 start() 메서드를 정의하는 ReadableStream를 만듭니다. 이 메서드는 10초 동안 매초 타임스탬프를 스트림의 controllerenqueue()에 알립니다. 마지막으로 컨트롤러에 스트림을 close()하도록 지시합니다. getReader() 메서드를 통해 리더를 만들고 스트림이 done가 될 때까지 read()를 호출하여 이 스트림을 사용합니다.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

비동기 반복

read() 루프 반복을 확인하여 스트림이 done인지 확인하는 것은 가장 편리한 API가 아닐 수 있습니다. 다행히 이 작업을 수행하는 더 나은 방법인 비동기 반복이 곧 제공될 예정입니다.

for await (const chunk of stream) {
  console.log(chunk);
}

현재 비동기 반복을 사용하는 해결 방법은 폴리필로 동작을 구현하는 것입니다.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

읽을 수 있는 스트림 티핑

ReadableStream 인터페이스의 tee() 메서드는 현재 읽을 수 있는 스트림을 티팅하여 두 개의 결과 브랜치가 포함된 2개의 요소 배열을 새 ReadableStream 인스턴스로 반환합니다. 이렇게 하면 두 리더가 동시에 스트림을 읽을 수 있습니다. 예를 들어 서버로부터 응답을 가져와 브라우저로 스트리밍하고 서비스 워커 캐시로도 스트리밍하려는 경우 서비스 워커에서 이를 수행할 수 있습니다. 응답 본문은 두 번 이상 사용할 수 없으므로 이를 위해서는 사본 두 개가 필요합니다. 스트림을 취소하려면 결과로 반환되는 두 브랜치를 모두 취소해야 합니다. 스트림을 티핑하면 일반적으로 일정 시간 동안 스트림이 잠기므로 다른 독자가 스트림을 잠글 수 없습니다.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

판독 가능한 바이트 스트림

바이트를 나타내는 스트림의 경우, 특히 사본을 최소화하여 바이트를 효율적으로 처리할 수 있도록 읽을 수 있는 스트림의 확장된 버전이 제공됩니다. 바이트 스트림을 사용하면 BYOB(Bring Your Own-buffer) 리더를 획득할 수 있습니다. 기본 구현은 WebSocket의 경우 문자열 또는 배열 버퍼와 같은 다양한 출력을 제공하는 반면, 바이트 스트림은 바이트 출력을 보장합니다. 또한 BYOB 리더는 안정성 측면에서 이점이 있습니다. 버퍼가 분리되면 동일한 버퍼에 두 번 쓰지 않도록 보장하여 경합 상태를 방지할 수 있기 때문입니다. BYOB 리더는 버퍼를 재사용할 수 있으므로 브라우저에서 가비지 컬렉션을 실행해야 하는 횟수를 줄일 수 있습니다.

읽을 수 있는 바이트 스트림 만들기

추가 type 매개변수를 ReadableStream() 생성자에 전달하여 읽을 수 있는 바이트 스트림을 만들 수 있습니다.

new ReadableStream({ type: 'bytes' });

underlyingSource

판독 가능한 바이트 스트림의 기본 소스에는 조작할 ReadableByteStreamController가 제공됩니다. ReadableByteStreamController.enqueue() 메서드는 값이 ArrayBufferViewchunk 인수를 사용합니다. 속성 ReadableByteStreamController.byobRequest는 현재 BYOB pull 요청을 반환하거나, 요청이 없는 경우 null을 반환합니다. 마지막으로 ReadableByteStreamController.desiredSize 속성은 제어되는 스트림의 내부 큐를 채우기 위해 원하는 크기를 반환합니다.

queuingStrategy

마찬가지로 선택사항인 ReadableStream() 생성자의 두 번째 인수는 queuingStrategy입니다. 이는 한 가지 매개변수를 사용하는 스트림의 대기열 전략을 선택적으로 정의하는 객체입니다.

  • highWaterMark: 이 대기열 전략을 사용하는 스트림의 높은 워터마크를 나타내는 음수가 아닌 바이트 수입니다. 이는 적절한 ReadableByteStreamController.desiredSize 속성을 통해 나타나는 백프레셔를 확인하는 데 사용됩니다. 또한 기본 소스의 pull() 메서드가 호출되는 시점을 제어합니다.

getReader()read() 메서드

그런 다음 mode 매개변수를 다음과 같이 설정하여 ReadableStreamBYOBReader에 액세스할 수 있습니다. ReadableStream.getReader({ mode: "byob" }) 이를 통해 버퍼 할당을 더 정밀하게 제어하여 사본을 피할 수 있습니다. 바이트 스트림에서 읽으려면 ReadableStreamBYOBReader.read(view)를 호출해야 합니다. 여기서 viewArrayBufferView입니다.

읽기 가능한 바이트 스트림 코드 샘플

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

다음 함수는 무작위로 생성된 배열의 제로 카피를 효율적으로 읽을 수 있는 판독 가능한 바이트 스트림을 반환합니다. 미리 결정된 청크 크기인 1,024를 사용하는 대신 개발자가 제공한 버퍼를 채우려고 시도하여 완전한 제어를 가능하게 합니다.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

쓰기 가능한 스트림의 메커니즘

쓰기 가능한 스트림은 데이터를 쓸 수 있는 대상으로, JavaScript에서 WritableStream 객체로 표현됩니다. 이는 원시 데이터가 기록되는 하위 수준 I/O 싱크인 기본 싱크 위에서 추상화 역할을 합니다.

데이터는 한 번에 하나씩 작성자를 통해 스트림에 기록됩니다. 청크는 리더의 청크처럼 여러 형태를 취할 수 있습니다. 원하는 코드를 사용하여 작성할 준비가 된 청크를 생성할 수 있습니다. 작성자와 관련 코드를 생산자라고 합니다.

작성자가 생성되어 스트림 (활성 작성자)에 쓰기를 시작하면 쓰기가 잠겨 있다고 합니다. 한 번에 한 명의 작성자만 쓰기 가능한 스트림에 쓸 수 있습니다. 다른 작성자가 스트림에 쓰기를 시작하게 하려면 일반적으로 다른 작성기를 연결하기 전에 해제해야 합니다.

내부 큐는 스트림에 작성되었지만 아직 기본 싱크에 의해 처리되지 않은 청크를 추적합니다.

큐 추가 전략은 내부 큐의 상태에 따라 스트림이 백프레셔 신호를 보내는 방식을 결정하는 객체입니다. 큐 전략은 각 청크에 크기를 할당하고 큐에 있는 모든 청크의 총 크기를 지정된 수(하이 워터마크라고 함)와 비교합니다.

최종 구조는 컨트롤러라고 합니다. 쓰기 가능한 각 스트림에는 스트림을 제어할 수 있는 컨트롤러가 연결되어 있습니다 (예: 스트림 취소).

쓰기 가능한 스트림 만들기

Streams API의 WritableStream 인터페이스는 대상(싱크라고 함)에 스트리밍 데이터를 쓰기 위한 표준 추상화를 제공합니다. 이 객체에는 백프레셔 및 큐가 내장되어 있습니다. 생성자 WritableStream()를 호출하여 쓰기 가능한 스트림을 만듭니다. 생성된 스트림 인스턴스의 작동 방식을 정의하는 메서드와 속성이 있는 객체를 나타내는 선택적 underlyingSink 매개변수가 있습니다.

underlyingSink

underlyingSink에는 다음과 같은 선택적 개발자 정의 메서드가 포함될 수 있습니다. 일부 메서드에 전달된 controller 매개변수는 WritableStreamDefaultController입니다.

  • start(controller): 이 메서드는 객체가 생성될 때 즉시 호출됩니다. 이 메서드의 콘텐츠는 기본 싱크에 액세스하는 것을 목표로 해야 합니다. 이 프로세스를 비동기식으로 실행하려면 성공 또는 실패를 알리는 프로미스를 반환할 수 있습니다.
  • write(chunk, controller): 이 메서드는 새로운 데이터 청크 (chunk 매개변수에 지정됨)를 기본 싱크에 쓸 준비가 되면 호출됩니다. 쓰기 작업의 성공 또는 실패를 알리기 위해 프로미스를 반환할 수 있습니다. 이 메서드는 이전 쓰기가 성공한 후에만 호출되며 스트림이 닫히거나 취소된 후에는 호출되지 않습니다.
  • close(controller): 앱이 스트림에 청크 작성을 완료했다고 알리는 경우 이 메서드가 호출됩니다. 콘텐츠는 기본 싱크에 쓰기를 완료하고 이에 대한 액세스 권한을 해제하는 데 필요한 모든 작업을 실행해야 합니다. 이 프로세스가 비동기식이면 성공 또는 실패를 알리는 프로미스를 반환할 수 있습니다. 이 메서드는 큐에 추가된 모든 쓰기가 성공한 후에만 호출됩니다.
  • abort(reason): 앱에서 스트림을 갑자기 닫고 오류 상태로 전환하겠다고 알리면 이 메서드가 호출됩니다. close()와 마찬가지로 보관된 리소스를 정리할 수 있지만 쓰기가 큐에 추가되더라도 abort()는 호출됩니다. 이 청크는 버려집니다. 이 프로세스가 비동기식이면 성공 또는 실패를 알리는 프로미스를 반환할 수 있습니다. reason 매개변수에는 스트림이 중단된 이유를 설명하는 DOMString가 포함됩니다.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API의 WritableStreamDefaultController 인터페이스는 쓰기를 위해 또는 작성이 끝날 때 더 많은 청크가 제출되므로 설정 중에 WritableStream의 상태를 제어할 수 있는 컨트롤러를 나타냅니다. WritableStream를 구성할 때 기본 싱크에는 조작할 상응하는 WritableStreamDefaultController 인스턴스가 제공됩니다. WritableStreamDefaultController에는 연결된 스트림과의 향후 상호작용에서 오류가 발생하게 하는 WritableStreamDefaultController.error() 메서드가 하나뿐입니다. 또한 WritableStreamDefaultControllerAbortSignal 인스턴스를 반환하는 signal 속성을 지원하므로 필요한 경우 WritableStream 작업을 중지할 수 있습니다.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

마찬가지로 선택사항인 WritableStream() 생성자의 두 번째 인수는 queuingStrategy입니다. 이 객체는 스트림의 대기열 전략을 선택적으로 정의하는 객체로, 다음 두 가지 매개변수를 사용합니다.

  • highWaterMark: 이 대기열 전략을 사용하는 스트림의 높은 워터마크를 나타내는 음수가 아닌 숫자입니다.
  • size(chunk): 지정된 청크 값의 음수가 아닌 유한 크기를 계산하고 반환하는 함수입니다. 결과는 적절한 WritableStreamDefaultWriter.desiredSize 속성을 통해 나타나는 백프레셔를 확인하는 데 사용됩니다.

getWriter()write() 메서드

쓰기 가능한 스트림에 쓰려면 작성자(WritableStreamDefaultWriter)가 필요합니다. WritableStream 인터페이스의 getWriter() 메서드는 WritableStreamDefaultWriter의 새 인스턴스를 반환하고 이 인스턴스로 스트림을 잠급니다. 스트림이 잠겨 있으면 현재 스트림이 해제될 때까지 다른 작성자를 획득할 수 없습니다.

WritableStreamDefaultWriter 인터페이스의 write() 메서드는 전달된 데이터 청크를 WritableStream 및 기본 싱크에 작성한 다음 쓰기 작업의 성공 또는 실패를 나타내는 프로미스를 반환합니다. '성공'의 의미는 기본 싱크에 따라 다릅니다. 청크가 수락되었음을 나타낼 수 있지만 최종 대상에 안전하게 저장된 것은 아닙니다.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 속성

WritableStream.locked 속성에 액세스하여 쓰기 가능한 스트림이 잠겨 있는지 확인할 수 있습니다.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

쓰기 가능한 스트림 코드 샘플

아래의 코드 샘플은 작동 중인 모든 단계를 보여줍니다.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

읽을 수 있는 스트림을 쓰기 가능한 스트림으로 파이핑

읽을 수 있는 스트림은 읽을 수 있는 스트림의 pipeTo() 메서드를 통해 쓰기 가능한 스트림으로 파이핑될 수 있습니다. ReadableStream.pipeTo()는 현재 ReadableStream를 지정된 WritableStream로 파이핑하고 파이핑 프로세스가 성공적으로 완료되면 처리하는 프로미스를 반환하거나 오류가 발생하면 거부합니다.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

변환 스트림 만들기

Streams API의 TransformStream 인터페이스는 변환 가능한 데이터 세트를 나타냅니다. 지정된 핸들러에서 변환 스트림 객체를 만들고 반환하는 생성자 TransformStream()를 호출하여 변환 스트림을 만듭니다. TransformStream() 생성자는 transformer를 나타내는 선택적 JavaScript 객체를 첫 번째 인수로 받습니다. 이러한 객체에는 다음 메서드가 포함될 수 있습니다.

transformer

  • start(controller): 이 메서드는 객체가 생성될 때 즉시 호출됩니다. 이는 일반적으로 controller.enqueue()를 사용하여 프리픽스 청크를 큐에 추가하는 데 사용됩니다. 이러한 청크는 읽기 가능 측에서 읽히지만 쓰기 가능 측에 대한 쓰기에 의존하지 않습니다. 이 초기 프로세스가 비동기식인 경우(예: 프리픽스 청크를 획득하는 데 약간의 노력이 필요하기 때문에) 함수는 성공 또는 실패를 알리기 위해 프로미스를 반환할 수 있습니다. 거부된 프로미스는 스트림에 오류를 표시합니다. 발생한 예외는 TransformStream() 생성자에 의해 다시 발생합니다.
  • transform(chunk, controller): 이 메서드는 원래 쓰기 가능한 쪽에 작성된 새 청크를 변환할 준비가 되면 호출됩니다. 스트림 구현은 이전 변환에 성공한 후에만 이 함수가 호출되고 start()이 완료되기 전이나 flush()가 호출된 후에는 호출되지 않도록 보장합니다. 이 함수는 변환 스트림의 실제 변환 작업을 수행합니다. controller.enqueue()를 사용하여 결과를 큐에 추가할 수 있습니다. 이렇게 하면 controller.enqueue() 호출 횟수에 따라 단일 청크가 쓰기 가능한 쪽에 쓰여져 판독 가능한 쪽에 0개 또는 여러 개의 청크가 생성됩니다. 변환 프로세스가 비동기식인 경우 이 함수는 변환의 성공 또는 실패를 알리는 프로미스를 반환할 수 있습니다. promise가 거부되면 변환 스트림의 읽기 가능 측면과 쓰기 가능 측면 모두에서 오류가 발생합니다. transform() 메서드가 제공되지 않으면 항등식 변환이 사용되어 쓰기 가능한 쪽에서 읽기 가능한 쪽으로 변경되지 않은 청크를 큐에 추가합니다.
  • flush(controller): 이 메서드는 쓰기 가능한 측에 작성된 모든 청크가 transform()를 통해 성공적으로 변환되고 쓰기 가능한 측이 닫히려고 한 후에 호출됩니다. 일반적으로 이 방법은 서픽스 청크도 닫기 전에 읽을 수 있는 쪽에 서픽스 청크를 추가하는 데 사용됩니다. 플러시 프로세스가 비동기식이면 함수는 성공 또는 실패 신호를 보내기 위한 프로미스를 반환할 수 있습니다. 결과는 stream.writable.write()의 호출자에게 전달됩니다. 또한 promise가 거부되면 스트림의 읽기 가능 측면과 쓰기 가능 측면 모두에 오류가 발생합니다. 예외 발생은 거부된 프로미스를 반환하는 것과 동일하게 취급됩니다.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 대기열 전략

TransformStream() 생성자의 두 번째 및 세 번째 선택적 매개변수는 선택적 writableStrategyreadableStrategy 큐 전략입니다. 읽기 가능 스트림 섹션과 쓰기 가능 스트림 섹션에 각각 설명된 대로 정의됩니다.

변환 스트림 코드 샘플

다음 코드 샘플은 실행 중인 간단한 변환 스트림을 보여줍니다.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

변환 스트림을 통해 읽을 수 있는 스트림 파이핑

ReadableStream 인터페이스의 pipeThrough() 메서드는 변환 스트림 또는 다른 쓰기 가능/읽기 가능 쌍을 통해 현재 스트림을 파이핑하는 체인 가능한 방법을 제공합니다. 스트림을 파이핑하면 일반적으로 파이프가 지속되는 동안 스트림이 잠기므로 다른 리더가 스트림을 잠글 수 없습니다.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

다음 코드 샘플 (약간 엉뚱함)에서는 반환된 응답 프로미스를 스트림으로 사용하고 청크별로 청크를 대문자로 하여 모든 텍스트를 대문자하는 fetch()의 '소리내는' 버전을 구현하는 방법을 보여줍니다. 이 접근 방식의 장점은 전체 문서가 다운로드될 때까지 기다릴 필요가 없다는 것입니다. 따라서 대용량 파일을 다룰 때 큰 차이를 만들 수 있습니다.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

데모

아래 데모는 작동 중인 읽기, 쓰기, 변환 스트림을 보여줍니다. 또한 pipeThrough()pipeTo() 파이프 체인의 예와 tee()를 보여줍니다. 원하는 경우 자체 창에서 데모를 실행하거나 소스 코드를 볼 수 있습니다.

브라우저에서 유용한 스트림 이용 가능

브라우저에는 여러 가지 유용한 스트림이 내장되어 있습니다. blob에서 쉽게 ReadableStream를 만들 수 있습니다. Blob 인터페이스의 stream() 메서드는 읽을 때 blob에 포함된 데이터를 반환하는 ReadableStream를 반환합니다. 또한 File 객체는 특정 종류의 Blob이며 blob이 사용할 수 있는 모든 컨텍스트에서 사용할 수 있습니다.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode()의 스트리밍 변형을 각각 TextDecoderStreamTextEncoderStream이라고 합니다.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

CompressionStream 변환 스트림과 DecompressionStream 변환 스트림을 사용하면 파일을 쉽게 압축하거나 압축 해제할 수 있습니다. 아래의 코드 샘플은 스트림 사양을 다운로드하고, 브라우저에서 바로 압축 (gzip)하고, 압축된 파일을 디스크에 직접 쓰는 방법을 보여줍니다.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream 및 실험용 fetch() 요청 스트림은 실제 환경에서 쓰기 가능한 스트림의 예입니다.

Serial API는 읽기 및 쓰기 가능한 스트림을 모두 많이 사용합니다.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

마지막으로 WebSocketStream API는 스트림을 WebSocket API와 통합합니다.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

유용한 리소스

감사의 말

이 자료는 Jake Archibald, François Boaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley, Adam Rice가 검토했습니다. 제이크 아치볼드의 블로그 게시물은 스트림을 이해하는 데 많은 도움이 되었습니다. 일부 코드 샘플은 GitHub 사용자 @bellbind의 탐색 분석과 스트림의 MDN Web Docs에 기반한 산문 빌드에서 영감을 받았습니다. Streams Standard작성자가 이 사양을 대단히 작성했습니다. UnsplashRyan Lara가 히어로 이미지를 작성했습니다.