Streams - 최종 가이드

Streams API를 사용하여 읽기, 쓰기, 변환 가능한 스트림을 사용하는 방법을 알아봅니다.

Streams API를 사용하면 네트워크를 통해 수신되거나 로컬에서 어떤 수단으로든 생성된 데이터 스트림에 프로그래매틱 방식으로 액세스하고 JavaScript로 처리할 수 있습니다. 스트리밍은 수신, 전송 또는 변환하려는 리소스를 작은 청크로 분할한 다음 이러한 청크를 조금씩 처리하는 것을 말합니다. 스트리밍은 웹페이지에 표시할 HTML이나 동영상과 같은 애셋을 수신할 때 브라우저에서 실행하는 작업이지만, 2015년에 스트림이 포함된 fetch가 도입되기 전에는 JavaScript에서 이 기능을 사용할 수 없었습니다.

이전에는 동영상이나 텍스트 파일과 같은 리소스를 처리하려면 전체 파일을 다운로드하고 적절한 형식으로 역직렬화될 때까지 기다린 후 처리해야 했습니다. JavaScript에서 스트림을 사용할 수 있게 되면서 모든 것이 달라집니다. 이제 버퍼, 문자열 또는 blob을 생성하지 않고도 클라이언트에서 사용할 수 있는 즉시 JavaScript로 원시 데이터를 점진적으로 처리할 수 있습니다. 이를 통해 다양한 사용 사례를 활용할 수 있습니다. 다음은 그중 일부입니다.

  • 동영상 효과: 실시간으로 효과를 적용하는 변환 스트림을 통해 읽을 수 있는 동영상 스트림을 파이핑합니다.
  • 데이터 (복호화) 압축: 파일 스트림을 선택적으로 (복호화) 압축하는 변환 스트림을 통해 파일 스트림을 파이프합니다.
  • 이미지 디코딩: 바이트 데이터를 비트맵 데이터로 디코딩하는 변환 스트림을 통해 HTTP 응답 스트림을 파이프한 다음 비트맵을 PNG로 변환하는 다른 변환 스트림을 통해 파이프합니다. 서비스 워커의 fetch 핸들러 내에 설치하면 AVIF와 같은 새 이미지 형식을 투명하게 폴리필할 수 있습니다.

브라우저 지원

ReadableStream 및 WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

핵심 개념

다양한 유형의 스트림에 대해 자세히 알아보기 전에 몇 가지 핵심 개념을 소개하겠습니다.

청크

청크는 스트림에 쓰거나 스트림에서 읽는 단일 데이터입니다. 스트림은 어떤 유형이든 될 수 있으며, 스트림에 서로 다른 유형의 청크가 포함될 수도 있습니다. 대부분의 경우 청크는 특정 스트림의 가장 원자적인 데이터 단위가 아닙니다. 예를 들어 바이트 스트림에는 단일 바이트 대신 16KiB Uint8Array 단위로 구성된 청크가 포함될 수 있습니다.

읽을 수 있는 스트림

읽을 수 있는 스트림은 읽을 수 있는 데이터 소스를 나타냅니다. 즉, 읽을 수 있는 스트림에서 데이터가 나옴 구체적으로 읽을 수 있는 스트림은 ReadableStream 클래스의 인스턴스입니다.

쓰기 가능한 스트림

쓰기 가능한 스트림은 데이터를 쓸 수 있는 대상을 나타냅니다. 즉, 데이터가 쓰기 가능한 스트림으로 전송됩니다. 구체적으로 쓰기 가능한 스트림은 WritableStream 클래스의 인스턴스입니다.

스트림 변환

변환 스트림은 스트림 쌍으로 구성됩니다. 쓰기 가능한 측면이라고 하는 쓰기 가능한 스트림과 읽기 가능한 측면이라고 하는 읽기 가능한 스트림입니다. 이를 실생활에 비유하자면 한 언어에서 다른 언어로 실시간으로 번역하는 동시 통역자라고 할 수 있습니다. 변환 스트림에 따라 쓰기 가능한 측면에 쓰면 읽기 가능한 측면에서 새 데이터를 읽을 수 있습니다. 구체적으로 writable 속성과 readable 속성이 있는 모든 객체는 변환 스트림으로 사용할 수 있습니다. 하지만 표준 TransformStream 클래스를 사용하면 올바르게 얽힌 이러한 쌍을 더 쉽게 만들 수 있습니다.

파이프 체인

스트림은 주로 서로 파이프하여 사용합니다. 읽기 가능한 스트림은 읽기 가능한 스트림의 pipeTo() 메서드를 사용하여 쓰기 가능한 스트림으로 직접 파이프할 수 있습니다. 또는 읽기 가능한 스트림의 pipeThrough() 메서드를 사용하여 먼저 하나 이상의 변환 스트림을 통해 파이프할 수 있습니다. 이러한 방식으로 함께 파이프된 스트림 세트를 파이프 체인이라고 합니다.

역압력

파이프 체인이 구성되면 청크가 파이프 체인을 통해 얼마나 빨리 흐르는지에 관한 신호가 전파됩니다. 체인의 단계에서 아직 청크를 수용할 수 없는 경우 파이프 체인을 통해 신호를 역방향으로 전파하여 결국 원본 소스에 청크 생성을 중지하도록 지시합니다. 이러한 흐름을 정규화하는 프로세스를 백프레셔라고 합니다.

티잉

읽을 수 있는 스트림은 tee() 메서드를 사용하여 티 (대문자 'T' 모양의 이름)할 수 있습니다. 이렇게 하면 스트림이 잠겨 더 이상 직접 사용할 수 없게 됩니다. 하지만 독립적으로 사용할 수 있는 브랜치라는 두 개의 새 스트림이 생성됩니다. 스트림을 되감거나 다시 시작할 수 없으므로 시작도 중요합니다. 자세한 내용은 나중에 설명합니다.

가져오기 API 호출에서 비롯된 읽기 가능한 스트림으로 구성된 파이프 체인의 다이어그램. 이 스트림은 출력이 분기된 변환 스트림을 통해 파이프되고 첫 번째 결과 읽기 가능한 스트림의 경우 브라우저로, 두 번째 결과 읽기 가능한 스트림의 경우 서비스 워커 캐시로 전송됩니다.
파이프 체인

읽을 수 있는 스트림의 메커니즘

읽을 수 있는 스트림은 JavaScript에서 기본 소스에서 흐르는 ReadableStream 객체로 표현되는 데이터 소스입니다. ReadableStream() 생성자는 지정된 핸들러에서 읽을 수 있는 스트림 객체를 만들고 반환합니다. 기본 소스에는 두 가지 유형이 있습니다.

  • 푸시 소스는 액세스할 때마다 지속적으로 데이터를 푸시하며, 스트림에 대한 액세스를 시작, 일시중지 또는 취소하는 것은 사용자의 책임입니다. 라이브 동영상 스트림, 서버 전송 이벤트 또는 WebSockets를 예로 들 수 있습니다.
  • 가져오기 소스는 연결된 후 소스에서 데이터를 명시적으로 요청해야 합니다. 예를 들어 fetch() 또는 XMLHttpRequest 호출을 통한 HTTP 작업이 있습니다.

스트림 데이터는 청크라는 작은 단위로 순차적으로 읽힙니다. 스트림에 배치된 청크는 대기열에 추가되었다고 합니다. 즉, 읽을 준비가 되어 대기열에 있습니다. 내부 대기열은 아직 읽지 않은 청크를 추적합니다.

대기열 전략은 스트림이 내부 대기열의 상태에 따라 백프레셔를 신호하는 방법을 결정하는 객체입니다. 큐잉 전략은 각 청크에 크기를 할당하고 큐에 있는 모든 청크의 총 크기를 최고 수위라고 하는 지정된 숫자와 비교합니다.

스트림 내의 청크는 리더가 읽습니다. 이 리더는 한 번에 하나의 청크씩 데이터를 검색하므로 원하는 종류의 작업을 할 수 있습니다. 리더와 함께 제공되는 다른 처리 코드를 소비자라고 합니다.

이 컨텍스트에서 다음 구성은 컨트롤러라고 합니다. 읽을 수 있는 각 스트림에는 이름에서 알 수 있듯이 스트림을 제어할 수 있는 연결된 컨트롤러가 있습니다.

한 번에 하나의 리더만 스트림을 읽을 수 있습니다. 리더가 생성되고 스트림 읽기를 시작하면(즉, 활성 리더가 됨) 잠김 상태가 됩니다. 다른 리더가 스트림 읽기를 대신하도록 하려면 일반적으로 다른 작업을 수행하기 전에 첫 번째 리더를 해제해야 합니다(스트림을 티잉할 수는 있음).

읽을 수 있는 스트림 만들기

생성자 ReadableStream()를 호출하여 읽을 수 있는 스트림을 만듭니다. 생성자에는 생성된 스트림 인스턴스의 동작 방식을 정의하는 메서드와 속성이 있는 객체를 나타내는 선택적 인수 underlyingSource가 있습니다.

underlyingSource

다음과 같은 개발자 정의 옵션 메서드를 사용할 수 있습니다.

  • start(controller): 객체가 생성될 때 즉시 호출됩니다. 이 메서드는 스트림 소스에 액세스하고 스트림 기능을 설정하는 데 필요한 다른 작업을 실행할 수 있습니다. 이 프로세스를 비동기식으로 실행하려면 메서드가 성공 또는 실패를 알리는 프로미스를 반환할 수 있습니다. 이 메서드에 전달된 controller 매개변수는 ReadableStreamDefaultController입니다.
  • pull(controller): 더 많은 청크를 가져올 때 스트림을 제어하는 데 사용할 수 있습니다. 스트림의 내부 청크 큐가 가득 찰 때까지, 즉 큐가 최고 수위에 도달할 때까지 반복적으로 호출됩니다. pull() 호출의 결과가 약속인 경우 해당 약속이 충족될 때까지 pull()가 다시 호출되지 않습니다. 프라미스가 거부되면 스트림에 오류가 발생합니다.
  • cancel(reason): 스트림 소비자가 스트림을 취소할 때 호출됩니다.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController는 다음 메서드를 지원합니다.

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() 생성자의 두 번째 인수(선택사항)는 queuingStrategy입니다. 스트림의 큐잉 전략을 선택적으로 정의하는 객체로, 두 가지 매개변수를 사용합니다.

  • highWaterMark: 이 큐 전략을 사용하는 스트림의 최고 수위를 나타내는 비음수입니다.
  • size(chunk): 지정된 청크 값의 유한한 양수 크기를 계산하고 반환하는 함수입니다. 결과는 백프레셔를 결정하는 데 사용되며 적절한 ReadableStreamDefaultController.desiredSize 속성을 통해 표시됩니다. 또한 기본 소스의 pull() 메서드가 호출되는 시점도 지정합니다.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 메서드

읽을 수 있는 스트림에서 읽으려면 ReadableStreamDefaultReader인 리더가 필요합니다. ReadableStream 인터페이스의 getReader() 메서드는 리더를 만들고 스트림을 리더에 잠급니다. 스트림이 잠겨 있는 동안에는 이 리더가 해제될 때까지 다른 리더를 획득할 수 없습니다.

ReadableStreamDefaultReader 인터페이스의 read() 메서드는 스트림의 내부 대기열에 있는 다음 청크에 대한 액세스를 제공하는 프로미스를 반환합니다. 스트림의 상태에 따라 결과를 사용하여 처리하거나 거부합니다. 가능한 경우는 다음과 같습니다.

  • 청크를 사용할 수 있는 경우 약속은
    { value: chunk, done: false } 형식의 객체로 처리됩니다.
  • 스트림이 닫히면 약속은
    { value: undefined, done: true } 형식의 객체로 처리됩니다.
  • 스트림에 오류가 발생하면 관련 오류와 함께 프라미스가 거부됩니다.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 속성

읽을 수 있는 스트림이 잠겨 있는지 확인하려면 ReadableStream.locked 속성에 액세스하면 됩니다.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

읽기 가능한 스트림 코드 샘플

아래 코드 샘플은 모든 단계가 실행되는 모습을 보여줍니다. 먼저 underlyingSource 인수 (즉, TimestampSource 클래스)에서 start() 메서드를 정의하는 ReadableStream를 만듭니다. 이 메서드는 스트림의 controller에 10초 동안 1초마다 타임스탬프를 enqueue()하도록 지시합니다. 마지막으로 컨트롤러에 스트림을 close()하도록 지시합니다. getReader() 메서드를 통해 리더를 만들고 스트림이 done이 될 때까지 read()를 호출하여 이 스트림을 사용합니다.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

비동기 반복

read() 루프 반복 시 스트림이 done인지 확인하는 것이 가장 편리한 API가 아닐 수 있습니다. 다행히 곧 더 나은 방법인 비동기 반복이 제공될 예정입니다.

for await (const chunk of stream) {
  console.log(chunk);
}

현재 비동기 반복을 사용하는 해결 방법은 폴리필로 동작을 구현하는 것입니다.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

읽을 수 있는 스트림을 티잉

ReadableStream 인터페이스의 tee() 메서드는 현재 읽을 수 있는 스트림을 티하여 두 개의 결과 브랜치를 새 ReadableStream 인스턴스로 포함하는 두 요소 배열을 반환합니다. 이렇게 하면 두 명의 리더가 스트림을 동시에 읽을 수 있습니다. 예를 들어 서버에서 응답을 가져와 브라우저로 스트리밍하는 동시에 서비스 워커 캐시로 스트리밍하려는 경우 서비스 워커에서 이 작업을 실행할 수 있습니다. 응답 본문은 두 번 이상 사용할 수 없으므로 이렇게 하려면 두 개의 사본이 필요합니다. 그런 다음 스트림을 취소하려면 결과 브랜치 두 개를 모두 취소해야 합니다. 스트림을 티하면 일반적으로 해당 기간 동안 스트림이 잠겨 다른 리더가 스트림을 잠그지 못하게 됩니다.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

읽을 수 있는 바이트 스트림

바이트가 포함된 스트림의 경우, 특히 사본을 최소화하여 바이트가 효율적으로 처리되도록 읽을 수 있는 스트림의 확장 버전이 제공됩니다. 바이트 스트림을 사용하면 Bring Your Own Buffer(BYOB) 리더를 획득할 수 있습니다. 기본 구현은 WebSocket의 경우 문자열이나 배열 버퍼와 같은 다양한 출력을 제공할 수 있지만, 바이트 스트림은 바이트 출력을 보장합니다. 또한 BYOB 리더는 안정성 이점이 있습니다. 버퍼가 분리되면 동일한 버퍼에 두 번 쓰지 않도록 보장할 수 있으므로 경합 상태를 방지할 수 있기 때문입니다. BYOB 리더는 버퍼를 재사용할 수 있으므로 브라우저에서 가비지 컬렉션을 실행해야 하는 횟수를 줄일 수 있습니다.

읽을 수 있는 바이트 스트림 만들기

ReadableStream() 생성자에 type 매개변수를 추가로 전달하여 읽을 수 있는 바이트 스트림을 만들 수 있습니다.

new ReadableStream({ type: 'bytes' });

underlyingSource

읽을 수 있는 바이트 스트림의 기본 소스에 조작할 ReadableByteStreamController가 제공됩니다. ReadableByteStreamController.enqueue() 메서드는 값이 ArrayBufferViewchunk 인수를 사용합니다. ReadableByteStreamController.byobRequest 속성은 현재 BYOB 풀 리퀘스트를 반환하거나 풀 리퀘스트가 없는 경우 null을 반환합니다. 마지막으로 ReadableByteStreamController.desiredSize 속성은 제어된 스트림의 내부 대기열을 채우기 위한 원하는 크기를 반환합니다.

queuingStrategy

ReadableStream() 생성자의 두 번째 인수(선택사항)는 queuingStrategy입니다. 스트림의 큐잉 전략을 선택적으로 정의하는 객체로, 다음과 같은 하나의 매개변수를 사용합니다.

  • highWaterMark: 이 큐잉 전략을 사용하는 스트림의 최대 수위를 나타내는 음수 이외의 바이트 수입니다. 이는 적절한 ReadableByteStreamController.desiredSize 속성을 통해 표시되는 백프레셔를 결정하는 데 사용됩니다. 또한 기본 소스의 pull() 메서드가 호출되는 시점도 지정합니다.

getReader()read() 메서드

그런 다음 mode 매개변수를 적절하게 설정하여 ReadableStreamBYOBReader에 액세스할 수 있습니다. ReadableStream.getReader({ mode: "byob" }) 이를 통해 복사를 방지하기 위해 버퍼 할당을 더 정확하게 제어할 수 있습니다. 바이트 스트림에서 읽으려면 ReadableStreamBYOBReader.read(view)를 호출해야 합니다. 여기서 viewArrayBufferView입니다.

읽을 수 있는 바이트 스트림 코드 샘플

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

다음 함수는 무작위로 생성된 배열을 효율적으로 제로 복사 읽기를 허용하는 읽을 수 있는 바이트 스트림을 반환합니다. 사전 결정된 청크 크기 1,024개를 사용하는 대신 개발자가 제공한 버퍼를 채우려고 시도하여 전체를 제어할 수 있습니다.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

쓰기 가능한 스트림의 메커니즘

쓰기 가능한 스트림은 데이터를 쓸 수 있는 대상이며 JavaScript에서는 WritableStream 객체로 표현됩니다. 이는 원시 데이터가 쓰여지는 하위 수준 I/O 싱크인 기본 싱크 위에 있는 추상화 역할을 합니다.

데이터는 작성기를 통해 한 번에 한 청크씩 스트림에 기록됩니다. 청크는 리더의 청크와 마찬가지로 다양한 형식을 취할 수 있습니다. 원하는 코드를 사용하여 쓰기에 준비된 청크를 생성할 수 있습니다. 작성자와 연결된 코드를 생산자라고 합니다.

작성자가 생성되고 스트림에 쓰기를 시작하면 (활성 작성자) 스트림에 잠겨 있다고 합니다. 한 번에 한 명의 작성자만 쓰기 가능한 스트림에 쓸 수 있습니다. 다른 작성자가 스트림에 쓰기를 시작하려면 일반적으로 스트림을 해제한 다음 다른 작성자를 연결해야 합니다.

내부 대기열은 스트림에 작성되었지만 아직 기본 싱크에서 처리되지 않은 청크를 추적합니다.

대기열 전략은 스트림이 내부 대기열의 상태에 따라 백프레셔를 신호하는 방법을 결정하는 객체입니다. 큐잉 전략은 각 청크에 크기를 할당하고 큐에 있는 모든 청크의 총 크기를 최고 수위라고 하는 지정된 숫자와 비교합니다.

최종 구성은 컨트롤러라고 합니다. 쓰기 가능한 각 스트림에는 스트림을 제어할 수 있는 연결된 컨트롤러가 있습니다 (예: 중단).

쓰기 가능한 스트림 만들기

Streams API의 WritableStream 인터페이스는 싱크라고 하는 대상에 스트리밍 데이터를 쓰기 위한 표준 추상화를 제공합니다. 이 객체에는 백프레셔 및 큐잉이 내장되어 있습니다. 생성자 WritableStream()를 호출하여 쓰기 가능한 스트림을 만듭니다. 구성된 스트림 인스턴스의 동작 방식을 정의하는 메서드와 속성이 있는 객체를 나타내는 선택적 underlyingSink 매개변수가 있습니다.

underlyingSink

underlyingSink는 다음과 같은 개발자 정의 메서드(선택사항)를 포함할 수 있습니다. 일부 메서드에 전달되는 controller 매개변수는 WritableStreamDefaultController입니다.

  • start(controller): 이 메서드는 객체가 생성될 때 즉시 호출됩니다. 이 메서드의 콘텐츠는 기본 싱크에 액세스하는 것을 목표로 해야 합니다. 이 프로세스를 비동기식으로 실행하려면 성공 또는 실패를 알리는 프로미스를 반환하면 됩니다.
  • write(chunk, controller): 이 메서드는 새 데이터 청크 (chunk 매개변수에 지정됨)를 기본 싱크에 쓸 준비가 되면 호출됩니다. 쓰기 작업의 성공 또는 실패를 알리는 프라미스를 반환할 수 있습니다. 이 메서드는 이전 쓰기가 성공한 후에만 호출되며 스트림이 닫히거나 중단된 후에는 호출되지 않습니다.
  • close(controller): 앱이 스트림에 청크 쓰기를 완료했다고 신호하면 이 메서드가 호출됩니다. 콘텐츠는 기본 싱크에 대한 쓰기를 완료하고 액세스 권한을 해제하는 데 필요한 모든 작업을 실행해야 합니다. 이 프로세스가 비동기식인 경우 성공 또는 실패를 알리는 약속을 반환할 수 있습니다. 이 메서드는 대기열에 추가된 모든 쓰기가 완료된 후에만 호출됩니다.
  • abort(reason): 앱이 스트림을 갑자기 닫고 오류 상태로 전환하겠다고 신호를 보내면 이 메서드가 호출됩니다. close()와 마찬가지로 보류된 리소스를 정리할 수 있지만 쓰기가 대기열에 추가되더라도 abort()가 호출됩니다. 이러한 청크는 삭제됩니다. 이 프로세스가 비동기식인 경우 성공 또는 실패를 알리는 프라미스를 반환할 수 있습니다. reason 매개변수에는 스트림이 중단된 이유를 설명하는 DOMString가 포함됩니다.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API의 WritableStreamDefaultController 인터페이스는 설정 중에, 쓰기를 위해 더 많은 청크가 제출될 때 또는 쓰기가 끝날 때 WritableStream의 상태를 제어할 수 있는 컨트롤러를 나타냅니다. WritableStream를 생성할 때 기본 싱크에는 조작할 상응하는 WritableStreamDefaultController 인스턴스가 제공됩니다. WritableStreamDefaultController에는 WritableStreamDefaultController.error()라는 메서드 하나만 있으며, 이 메서드를 사용하면 향후 연결된 스트림과의 상호작용 시 오류가 발생합니다. WritableStreamDefaultControllerAbortSignal의 인스턴스를 반환하는 signal 속성도 지원하므로 필요한 경우 WritableStream 작업을 중지할 수 있습니다.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() 생성자의 두 번째 인수(선택사항)는 queuingStrategy입니다. 스트림의 큐잉 전략을 선택적으로 정의하는 객체로, 두 가지 매개변수를 사용합니다.

  • highWaterMark: 이 큐 전략을 사용하는 스트림의 최고 수위를 나타내는 비음수입니다.
  • size(chunk): 지정된 청크 값의 유한한 양수 크기를 계산하고 반환하는 함수입니다. 이 결과는 백프레셔를 결정하는 데 사용되며 적절한 WritableStreamDefaultWriter.desiredSize 속성을 통해 표시됩니다.

getWriter()write() 메서드

쓰기 가능한 스트림에 쓰려면 작성자(WritableStreamDefaultWriter)가 필요합니다. WritableStream 인터페이스의 getWriter() 메서드는 새 WritableStreamDefaultWriter 인스턴스를 반환하고 해당 인스턴스에 스트림을 잠급니다. 스트림이 잠겨 있는 동안에는 현재 작성자가 해제될 때까지 다른 작성자를 획득할 수 없습니다.

WritableStreamDefaultWriter 인터페이스의 write() 메서드는 전달된 데이터 청크를 WritableStream 및 기본 싱크에 쓰고 나서 쓰기 작업의 성공 또는 실패를 나타내도록 확인되는 프라미스를 반환합니다. '성공'의 의미는 기본 싱크에 따라 다릅니다. 청크가 수락되었음을 나타낼 수 있지만 반드시 최종 대상에 안전하게 저장되었음을 나타내는 것은 아닙니다.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 속성

쓰기 가능한 스트림이 잠겨 있는지 확인하려면 WritableStream.locked 속성에 액세스하면 됩니다.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

쓰기 가능한 스트림 코드 샘플

아래의 코드 샘플은 모든 단계를 보여줍니다.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

읽을 수 있는 스트림을 쓰기 가능한 스트림으로 파이핑

읽기 가능한 스트림은 읽기 가능한 스트림의 pipeTo() 메서드를 통해 쓰기 가능한 스트림으로 파이프할 수 있습니다. ReadableStream.pipeTo()는 현재 ReadableStream를 지정된 WritableStream에 파이프하고 파이핑 프로세스가 성공적으로 완료되면 처리되는 프라미스를 반환하거나 오류가 발생하면 거부합니다.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

변환 스트림 만들기

Streams API의 TransformStream 인터페이스는 변환 가능한 데이터 집합을 나타냅니다. 생성자 TransformStream()를 호출하여 변환 스트림을 만듭니다. 이 생성자는 지정된 핸들러에서 변환 스트림 객체를 만들고 반환합니다. TransformStream() 생성자는 transformer를 나타내는 선택적 JavaScript 객체를 첫 번째 인수로 받습니다. 이러한 객체는 다음 메서드 중 하나를 포함할 수 있습니다.

transformer

  • start(controller): 이 메서드는 객체가 생성될 때 즉시 호출됩니다. 일반적으로 controller.enqueue()를 사용하여 접두어 청크를 큐에 추가하는 데 사용됩니다. 이러한 청크는 읽기 가능한 측면에서 읽히지만 쓰기 가능한 측면의 쓰기에 종속되지 않습니다. 이 초기 프로세스가 비동기식인 경우(예: 접두사 청크를 가져오는 데 약간의 노력이 필요함) 함수는 성공 또는 실패를 알리는 약속을 반환할 수 있습니다. 거부된 약속은 스트림에 오류를 발생시킵니다. 발생한 예외는 TransformStream() 생성자에 의해 다시 발생합니다.
  • transform(chunk, controller): 이 메서드는 원래 쓰기 가능한 측면에 작성된 새 청크를 변환할 준비가 되면 호출됩니다. 스트림 구현은 이 함수가 이전 변환이 성공한 후에만 호출되고 start()가 완료되기 전이나 flush()가 호출된 후에는 호출되지 않도록 보장합니다. 이 함수는 변환 스트림의 실제 변환 작업을 실행합니다. controller.enqueue()를 사용하여 결과를 큐에 추가할 수 있습니다. 이렇게 하면 쓰기 가능한 측면에 쓰여진 단일 청크가 controller.enqueue()가 호출된 횟수에 따라 읽기 가능한 측면에서 0개 또는 여러 개의 청크가 될 수 있습니다. 변환 프로세스가 비동기식인 경우 이 함수는 변환의 성공 또는 실패를 알리는 약속을 반환할 수 있습니다. 거부된 프라미스는 변환 스트림의 읽기 가능 측면과 쓰기 가능 측면 모두에서 오류가 발생합니다. transform() 메서드가 제공되지 않으면 쓰기 가능한 측면에서 읽기 가능한 측면으로 변경되지 않은 청크를 큐에 추가하는 ID 변환이 사용됩니다.
  • flush(controller): 이 메서드는 쓰기 가능한 측면에 쓰여진 모든 청크가 transform()를 통해 성공적으로 전달되어 변환된 후 쓰기 가능한 측면이 닫히려고 할 때 호출됩니다. 일반적으로 이는 읽을 수 있는 쪽이 닫히기 전에 접미사 청크를 읽을 수 있는 쪽에 큐에 추가하는 데 사용됩니다. 플러싱 프로세스가 비동기식인 경우 함수는 성공 또는 실패를 알리는 프라미스를 반환할 수 있습니다. 결과는 stream.writable.write()의 호출자에게 전달됩니다. 또한 거부된 약속은 스트림의 읽기 가능 측면과 쓰기 가능 측면 모두에서 오류가 발생합니다. 예외를 발생시키는 것은 거부된 약속을 반환하는 것과 동일하게 취급됩니다.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 큐 전략

TransformStream() 생성자의 두 번째 및 세 번째 선택적 매개변수는 선택적 writableStrategyreadableStrategy 큐 전략입니다. 이는 각각 읽기 가능쓰기 가능 스트림 섹션에 설명된 대로 정의됩니다.

스트림 변환 코드 샘플

다음 코드 샘플은 간단한 변환 스트림이 작동하는 모습을 보여줍니다.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

읽을 수 있는 스트림을 변환 스트림을 통해 파이핑

ReadableStream 인터페이스의 pipeThrough() 메서드는 변환 스트림 또는 기타 쓰기 가능/읽기 가능 쌍을 통해 현재 스트림을 파이프하는 체이닝 가능한 방법을 제공합니다. 스트림을 파이프하면 일반적으로 파이프가 진행되는 동안 스트림이 잠겨 다른 리더가 스트림을 잠그지 못하게 됩니다.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

다음 코드 샘플 (약간 인위적)은 반환된 응답 약속을 스트림으로 소비하고 청크별로 대문자로 변환하여 모든 텍스트를 대문자로 표시하는 fetch()의 '소리치기' 버전을 구현하는 방법을 보여줍니다. 이 접근 방식의 장점은 전체 문서가 다운로드될 때까지 기다릴 필요가 없다는 점입니다. 이는 대용량 파일을 처리할 때 큰 차이를 줄 수 있습니다.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

데모

아래 데모에서는 읽기, 쓰기, 변환 스트림이 작동하는 모습을 보여줍니다. 또한 pipeThrough()pipeTo() 파이프 체인의 예시를 포함하고 tee()도 보여줍니다. 원하는 경우 자체 창에서 데모를 실행하거나 소스 코드를 볼 수 있습니다.

브라우저에서 사용할 수 있는 유용한 스트림

브라우저에 내장된 여러 유용한 스트림이 있습니다. blob에서 ReadableStream를 쉽게 만들 수 있습니다. Blob 인터페이스의 stream() 메서드는 읽을 때 blob 내에 포함된 데이터를 반환하는 ReadableStream를 반환합니다. 또한 File 객체는 특정 종류의 Blob이며 blob이 사용할 수 있는 모든 컨텍스트에서 사용할 수 있습니다.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode()의 스트리밍 변형은 각각 TextDecoderStreamTextEncoderStream라고 합니다.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

CompressionStreamDecompressionStream 변환 스트림을 사용하면 파일을 쉽게 압축하거나 압축 해제할 수 있습니다. 아래 코드 샘플은 스트림 사양을 다운로드하고 브라우저에서 바로 압축 (gzip)한 후 압축된 파일을 디스크에 직접 쓰는 방법을 보여줍니다.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream 및 실험용 fetch() 요청 스트림은 실제 작성 가능한 스트림의 예입니다.

Serial API는 읽기 가능하고 쓰기 가능한 스트림을 모두 많이 사용합니다.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

마지막으로 WebSocketStream API는 스트림을 WebSocket API와 통합합니다.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

유용한 리소스

감사의 말씀

이 도움말은 제이크 아치볼드, 프랑소와 보포르, 샘 더튼, 마티아스 불렌스, 수르마, 조 미들리, 아담 라이스가 검토했습니다. 제이크 아치볼드의 블로그 게시물은 스트림을 이해하는 데 큰 도움이 되었습니다. 일부 코드 샘플은 GitHub 사용자 @bellbind의 탐색에서 영감을 얻었으며 스트림에 관한 MDN 웹 문서를 기반으로 작성되었습니다. 스트림 표준작성자가 이 사양을 작성하는 데 큰 도움을 주었습니다. 히어로 이미지는 UnsplashRyan Lara님이 제공해 주셨습니다.