스트림 API를 사용하여 읽기 가능, 쓰기 가능, 변환 스트림을 사용하는 방법을 알아봅니다.
Streams API를 사용하면 네트워크를 통해 수신되거나 로컬에서 어떤 방식으로든 생성된 데이터 스트림에 프로그래매틱 방식으로 액세스하고 JavaScript로 처리할 수 있습니다. 스트리밍은 수신, 전송 또는 변환하려는 리소스를 작은 청크로 분해한 다음 이러한 청크를 비트 단위로 처리하는 것을 말합니다. 스트리밍은 웹페이지에 표시할 HTML이나 동영상과 같은 애셋을 수신할 때 브라우저가 어쨌든 실행하는 작업이지만, 이 기능은 2015년에 스트림이 포함된 fetch
가 도입되기 전에는 JavaScript에서 사용할 수 없었습니다.
이전에는 동영상이나 텍스트 파일 등 리소스를 처리하려면 전체 파일을 다운로드하고 적절한 형식으로 역직렬화될 때까지 기다린 후 처리해야 했습니다. JavaScript에서 스트림을 사용할 수 있게 되면서 이 모든 것이 달라집니다. 이제 버퍼, 문자열 또는 blob을 생성하지 않고도 클라이언트에서 사용할 수 있게 되는 즉시 JavaScript로 원시 데이터를 점진적으로 처리할 수 있습니다. 이를 통해 다양한 사용 사례를 활용할 수 있으며, 그중 일부는 아래에 나열되어 있습니다.
- 동영상 효과: 효과를 실시간으로 적용하는 변환 스트림을 통해 읽을 수 있는 동영상 스트림을 파이핑합니다.
- 데이터 압축/해제: 선택적으로 압축/해제하는 변환 스트림을 통해 파일 스트림을 파이프합니다.
- 이미지 디코딩: 바이트를 비트맵 데이터로 디코딩하는 변환 스트림과 비트맵을 PNG로 변환하는 또 다른 변환 스트림을 통해 HTTP 응답 스트림을 파이핑합니다. 서비스 워커의
fetch
핸들러 내에 설치하면 AVIF와 같은 새로운 이미지 형식을 투명하게 폴리필할 수 있습니다.
브라우저 지원
ReadableStream 및 WritableStream
TransformStream
핵심 개념
다양한 유형의 스트림에 대해 자세히 설명하기 전에 몇 가지 핵심 개념을 소개하겠습니다.
청크
청크는 스트림에 쓰거나 스트림에서 읽는 단일 데이터 조각입니다. 모든 유형일 수 있으며 스트림에는 서로 다른 유형의 청크가 포함될 수도 있습니다. 대부분의 경우 청크는 특정 스트림의 가장 원자적인 데이터 단위가 아닙니다. 예를 들어 바이트 스트림에는 단일 바이트 대신 16KiB Uint8Array
단위로 구성된 청크가 포함될 수 있습니다.
읽을 수 있는 스트림
읽기 가능한 스트림은 읽을 수 있는 데이터 소스를 나타냅니다. 즉, 읽을 수 있는 스트림에서 데이터가 나옵니다. 구체적으로 읽기 가능한 스트림은 ReadableStream
클래스의 인스턴스입니다.
쓰기 가능한 스트림
쓰기 가능한 스트림은 데이터를 쓸 수 있는 데이터의 대상을 나타냅니다. 즉, 데이터가 쓰기 가능한 스트림으로 들어갑니다. 구체적으로 쓰기 가능한 스트림은 WritableStream
클래스의 인스턴스입니다.
스트림 변환
변환 스트림은 스트림 쌍으로 구성됩니다. 쓰기 가능한 스트림(쓰기 가능 측면)과 읽기 가능한 스트림(읽기 가능 측면)이 있습니다.
이것을 실제 상황에 비유하면 한 언어에서 다른 언어로 즉석에서 번역하는 동시 통역사와 같습니다.
변환 스트림에 특정한 방식으로 쓰기 가능한 측면에 쓰면 읽기 가능한 측면에서 읽을 수 있는 새 데이터가 생성됩니다. 구체적으로 writable
속성과 readable
속성이 있는 객체는 변환 스트림으로 사용할 수 있습니다. 하지만 표준 TransformStream
클래스를 사용하면 적절하게 얽힌 이러한 쌍을 더 쉽게 만들 수 있습니다.
파이프 체인
스트림은 주로 서로 파이프하여 사용됩니다. 읽기 가능한 스트림은 읽기 가능한 스트림의 pipeTo()
메서드를 사용하여 쓰기 가능한 스트림에 직접 파이프될 수 있으며, 읽기 가능한 스트림의 pipeThrough()
메서드를 사용하여 하나 이상의 변환 스트림을 통해 먼저 파이프될 수도 있습니다. 이러한 방식으로 함께 파이프된 스트림 집합을 파이프 체인이라고 합니다.
백프레셔
파이프 체인이 구성되면 청크가 파이프 체인을 통해 얼마나 빠르게 흘러야 하는지에 관한 신호가 전파됩니다. 체인의 단계에서 아직 청크를 수락할 수 없는 경우 파이프 체인을 통해 뒤로 신호를 전파하여 결국 원래 소스에 청크를 너무 빨리 생성하지 않도록 지시합니다. 이러한 흐름 정규화 프로세스를 역압이라고 합니다.
티잉
읽기 가능한 스트림은 tee()
메서드를 사용하여 티 (대문자 'T' 모양에서 이름을 따옴)할 수 있습니다.
이렇게 하면 스트림이 잠겨 더 이상 직접 사용할 수 없게 되지만, 독립적으로 사용할 수 있는 브랜치라는 두 개의 새 스트림이 생성됩니다.
스트림은 되감거나 다시 시작할 수 없으므로 티잉도 중요합니다. 자세한 내용은 나중에 설명합니다.
읽기 가능한 스트림의 메커니즘
읽기 가능한 스트림은 기본 소스에서 흐르는 ReadableStream
객체로 JavaScript에 표시되는 데이터 소스입니다. ReadableStream()
생성자는 지정된 핸들러에서 읽기 가능한 스트림 객체를 만들어 반환합니다. 기본 소스에는 두 가지 유형이 있습니다.
- 푸시 소스는 액세스한 경우 지속적으로 데이터를 푸시하며 스트림 액세스를 시작, 일시중지 또는 취소하는 것은 사용자의 몫입니다. 예로는 라이브 동영상 스트림, 서버 전송 이벤트, WebSocket이 있습니다.
- 풀 소스는 연결된 후 명시적으로 데이터를 요청해야 합니다. 예로는
fetch()
또는XMLHttpRequest
호출을 통한 HTTP 작업이 있습니다.
스트림 데이터는 청크라고 하는 작은 부분으로 순차적으로 읽습니다. 스트림에 배치된 청크는 대기열에 추가되었다고 합니다. 즉, 읽을 준비가 된 상태로 대기열에서 기다리고 있습니다. 내부 대기열은 아직 읽지 않은 청크를 추적합니다.
대기열 전략은 스트림이 내부 대기열의 상태에 따라 백프레셔를 다시 신호하는 방법을 결정하는 객체입니다. 대기열 전략은 각 청크에 크기를 할당하고 대기열에 있는 모든 청크의 총 크기를 최고 수위라고 하는 지정된 숫자와 비교합니다.
스트림 내부의 청크는 리더에 의해 읽힙니다. 이 리더는 데이터를 한 번에 한 청크씩 가져오므로 원하는 종류의 작업을 데이터에 실행할 수 있습니다. 리더와 함께 제공되는 기타 처리 코드를 소비자라고 합니다.
이 컨텍스트의 다음 구조를 컨트롤러라고 합니다. 각 읽기 가능한 스트림에는 이름에서 알 수 있듯이 스트림을 제어할 수 있는 연결된 컨트롤러가 있습니다.
한 번에 하나의 리더만 스트림을 읽을 수 있습니다. 리더가 생성되고 스트림을 읽기 시작하면(즉, 활성 리더가 되면) 스트림에 잠깁니다. 다른 리더가 스트림을 읽도록 하려면 다른 작업을 하기 전에 첫 번째 리더를 해제해야 합니다(스트림을 티할 수는 있음).
읽기 가능한 스트림 만들기
생성자 ReadableStream()
를 호출하여 읽기 가능한 스트림을 만듭니다.
생성자에는 생성된 스트림 인스턴스의 동작을 정의하는 메서드와 속성이 있는 객체를 나타내는 선택적 인수 underlyingSource
가 있습니다.
underlyingSource
여기서는 개발자가 정의한 다음 선택적 메서드를 사용할 수 있습니다.
start(controller)
: 객체가 생성될 때 즉시 호출됩니다. 이 메서드는 스트림 소스에 액세스하고 스트림 기능을 설정하는 데 필요한 다른 작업을 실행할 수 있습니다. 이 프로세스를 비동기식으로 실행해야 하는 경우 메서드는 성공 또는 실패를 알리는 프라미스를 반환할 수 있습니다. 이 메서드에 전달된controller
매개변수는ReadableStreamDefaultController
입니다.pull(controller)
: 더 많은 청크가 가져와질 때 스트림을 제어하는 데 사용할 수 있습니다. 청크의 스트림 내부 대기열이 가득 차지 않을 때 대기열이 최고 수위에 도달할 때까지 반복적으로 호출됩니다.pull()
호출 결과가 프로미스인 경우 해당 프로미스가 이행될 때까지pull()
가 다시 호출되지 않습니다. 프로미스가 거부되면 스트림에 오류가 발생합니다.cancel(reason)
: 스트림 소비자가 스트림을 취소할 때 호출됩니다.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
은 다음 메서드를 지원합니다.
ReadableStreamDefaultController.close()
은 연결된 스트림을 닫습니다.ReadableStreamDefaultController.enqueue()
은 연결된 스트림에 지정된 청크를 대기열에 추가합니다.ReadableStreamDefaultController.error()
로 인해 연결된 스트림과의 향후 상호작용에 오류가 발생합니다.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
ReadableStream()
생성자의 두 번째 인수(선택사항)는 queuingStrategy
입니다.
스트림의 대기열 전략을 선택적으로 정의하는 객체이며 다음 두 매개변수를 사용합니다.
highWaterMark
: 이 대기열 전략을 사용하는 스트림의 높은 워터마크를 나타내는 음수가 아닌 숫자입니다.size(chunk)
: 주어진 청크 값의 유한한 음수가 아닌 크기를 계산하여 반환하는 함수입니다. 결과는 적절한ReadableStreamDefaultController.desiredSize
속성을 통해 나타나는 배압을 결정하는 데 사용됩니다. 또한 기본 소스의pull()
메서드가 호출되는 시점을 관리합니다.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
getReader()
및 read()
메서드
읽을 수 있는 스트림에서 읽으려면 리더가 필요하며 이는 ReadableStreamDefaultReader
입니다.
ReadableStream
인터페이스의 getReader()
메서드는 리더를 만들고 스트림을 리더에 잠급니다. 스트림이 잠겨 있는 동안 이 스트림이 해제될 때까지 다른 리더를 획득할 수 없습니다.
ReadableStreamDefaultReader
인터페이스의 read()
메서드는 스트림의 내부 대기열에 있는 다음 청크에 대한 액세스를 제공하는 프로미스를 반환합니다. 스트림의 상태에 따라 결과를 사용하여 이행하거나 거부합니다. 다양한 가능성은 다음과 같습니다.
- 청크를 사용할 수 있는 경우 프로미스는
{ value: chunk, done: false }
형식의 객체로 처리됩니다. - 스트림이 닫히면 프로미스가
{ value: undefined, done: true }
형식의 객체로 이행됩니다. - 스트림에 오류가 발생하면 프라미스가 관련 오류와 함께 거부됩니다.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
locked
속성
읽기 가능한 스트림이 잠겨 있는지 확인하려면 ReadableStream.locked
속성에 액세스하면 됩니다.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
읽기 가능한 스트림 코드 샘플
아래 코드 샘플은 모든 단계가 작동하는 것을 보여줍니다. 먼저 underlyingSource
인수 (즉, TimestampSource
클래스)에 start()
메서드를 정의하는 ReadableStream
를 만듭니다.
이 메서드는 스트림의 controller
에 10초 동안 매초 타임스탬프를 enqueue()
하도록 지시합니다.
마지막으로 컨트롤러에 스트림을 close()
하라고 지시합니다. getReader()
메서드를 통해 리더를 만들고 스트림이 done
가 될 때까지 read()
를 호출하여 이 스트림을 사용합니다.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
비동기 반복
각 read()
루프 반복에서 스트림이 done
인지 확인하는 것은 가장 편리한 API가 아닐 수 있습니다.
다행히도 비동기 반복이라는 더 나은 방법이 곧 제공될 예정입니다.
for await (const chunk of stream) {
console.log(chunk);
}
오늘 비동기 반복을 사용하는 해결 방법은 폴리필을 사용하여 동작을 구현하는 것입니다.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
읽을 수 있는 스트림을 teeing
ReadableStream
인터페이스의 tee()
메서드는 현재 읽기 가능한 스트림을 분기하여 결과로 생성된 두 분기를 새 ReadableStream
인스턴스로 포함하는 2요소 배열을 반환합니다. 이렇게 하면 두 리더가 동시에 스트림을 읽을 수 있습니다. 예를 들어 서버에서 응답을 가져와 브라우저로 스트리밍하는 동시에 서비스 워커 캐시로 스트리밍하려는 경우 서비스 워커에서 이 작업을 수행할 수 있습니다. 응답 본문은 두 번 이상 사용할 수 없으므로 이를 위해서는 두 개의 사본이 필요합니다. 스트림을 취소하려면 결과로 생성된 두 브랜치를 모두 취소해야 합니다. 스트림을 티잉하면 일반적으로 기간 동안 스트림이 잠겨 다른 리더가 스트림을 잠글 수 없습니다.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
읽을 수 있는 바이트 스트림
바이트를 나타내는 스트림의 경우 바이트를 효율적으로 처리하기 위해 읽기 가능한 스트림의 확장 버전이 제공됩니다. 특히 복사본을 최소화합니다. 바이트 스트림을 사용하면 사용자 제공 버퍼(BYOB) 리더를 획득할 수 있습니다. 기본 구현은 WebSocket의 경우 문자열이나 배열 버퍼와 같은 다양한 출력을 제공할 수 있지만 바이트 스트림은 바이트 출력을 보장합니다. 또한 BYOB 리더는 안정성 이점이 있습니다. 버퍼가 분리되면 동일한 버퍼에 두 번 쓰지 않으므로 경합 상태를 방지할 수 있기 때문입니다. BYOB 리더는 버퍼를 재사용할 수 있으므로 브라우저에서 가비지 컬렉션을 실행해야 하는 횟수를 줄일 수 있습니다.
읽을 수 있는 바이트 스트림 만들기
ReadableStream()
생성자에 추가 type
매개변수를 전달하여 읽을 수 있는 바이트 스트림을 만들 수 있습니다.
new ReadableStream({ type: 'bytes' });
underlyingSource
읽을 수 있는 바이트 스트림의 기본 소스에는 조작할 ReadableByteStreamController
이 제공됩니다. ReadableByteStreamController.enqueue()
메서드는 값이 ArrayBufferView
인 chunk
인수를 사용합니다. ReadableByteStreamController.byobRequest
속성은 현재 BYOB 풀 요청을 반환하며, 풀 요청이 없는 경우 null을 반환합니다. 마지막으로 ReadableByteStreamController.desiredSize
속성은 제어된 스트림의 내부 대기열을 채우는 데 필요한 크기를 반환합니다.
queuingStrategy
ReadableStream()
생성자의 두 번째 인수(선택사항)는 queuingStrategy
입니다.
스트림의 대기열 전략을 선택적으로 정의하는 객체이며 하나의 매개변수를 사용합니다.
highWaterMark
: 이 대기열 전략을 사용하는 스트림의 최대치를 나타내는 음수가 아닌 바이트 수입니다. 이는 적절한ReadableByteStreamController.desiredSize
속성을 통해 나타나는 역압을 결정하는 데 사용됩니다. 또한 기본 소스의pull()
메서드가 호출되는 시점을 관리합니다.
getReader()
및 read()
메서드
그런 다음 mode
매개변수를 적절하게 설정하여 ReadableStreamBYOBReader
에 액세스할 수 있습니다.
ReadableStream.getReader({ mode: "byob" })
이를 통해 복사본을 방지하기 위해 버퍼 할당을 더 정확하게 제어할 수 있습니다. 바이트 스트림에서 읽으려면 ReadableStreamBYOBReader.read(view)
를 호출해야 합니다. 여기서 view
은 ArrayBufferView
입니다.
읽을 수 있는 바이트 스트림 코드 샘플
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
다음 함수는 임의로 생성된 배열을 효율적으로 제로 카피 읽을 수 있는 읽기 가능한 바이트 스트림을 반환합니다. 미리 결정된 청크 크기 1,024를 사용하는 대신 개발자가 제공한 버퍼를 채우려고 시도하여 완전한 제어가 가능합니다.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
쓰기 가능한 스트림의 메커니즘
쓰기 가능한 스트림은 데이터를 쓸 수 있는 대상이며 JavaScript에서는 WritableStream
객체로 표시됩니다. 이는 기본 싱크(원시 데이터가 기록되는 하위 수준 I/O 싱크) 위에 있는 추상화 역할을 합니다.
데이터는 작성기를 통해 스트림에 한 번에 한 청크씩 작성됩니다. 청크는 리더의 청크와 마찬가지로 다양한 형태를 취할 수 있습니다. 원하는 코드를 사용하여 작성할 준비가 된 청크를 생성할 수 있습니다. 작성자와 연결된 코드를 생성자라고 합니다.
작성자가 생성되고 스트림에 쓰기를 시작하면 (활성 작성자) 스트림에 잠긴 것으로 간주됩니다. 한 번에 하나의 작성자만 쓰기 가능한 스트림에 쓸 수 있습니다. 다른 작성자가 스트림에 쓰기를 시작하도록 하려면 일반적으로 스트림을 해제한 후 다른 작성자를 스트림에 연결해야 합니다.
내부 큐는 스트림에 작성되었지만 아직 기본 싱크에서 처리되지 않은 청크를 추적합니다.
대기열 전략은 스트림이 내부 대기열의 상태에 따라 백프레셔를 다시 신호하는 방법을 결정하는 객체입니다. 대기열 전략은 각 청크에 크기를 할당하고 대기열에 있는 모든 청크의 총 크기를 상한선이라고 하는 지정된 숫자와 비교합니다.
최종 구조를 컨트롤러라고 합니다. 각 쓰기 가능 스트림에는 스트림을 제어할 수 있는 연결된 컨트롤러가 있습니다 (예: 스트림 중단).
쓰기 가능한 스트림 만들기
Streams API의 WritableStream
인터페이스는 싱크라고 하는 대상에 스트리밍 데이터를 쓰기 위한 표준 추상화를 제공합니다. 이 객체에는 백프레셔와 대기열이 내장되어 있습니다. 생성자 WritableStream()
를 호출하여 쓰기 가능한 스트림을 만듭니다.
여기에는 생성된 스트림 인스턴스의 동작을 정의하는 메서드와 속성이 있는 객체를 나타내는 선택적 underlyingSink
매개변수가 있습니다.
underlyingSink
underlyingSink
에는 다음 선택적 개발자 정의 메서드가 포함될 수 있습니다. 일부 메서드에 전달되는 controller
매개변수는 WritableStreamDefaultController
입니다.
start(controller)
: 이 메서드는 객체가 생성될 때 즉시 호출됩니다. 이 메서드의 콘텐츠는 기본 싱크에 액세스하는 것을 목표로 해야 합니다. 이 프로세스를 비동기식으로 실행해야 하는 경우 성공 또는 실패를 알리는 프라미스를 반환할 수 있습니다.write(chunk, controller)
: 이 메서드는 새로운 데이터 청크 (chunk
매개변수에 지정됨)가 기본 싱크에 기록될 준비가 되면 호출됩니다. 쓰기 작업의 성공 또는 실패를 알리는 프라미스를 반환할 수 있습니다. 이 메서드는 이전 쓰기가 성공한 후에만 호출되며 스트림이 닫히거나 중단된 후에는 호출되지 않습니다.close(controller)
: 앱이 스트림에 청크 쓰기를 완료했다고 신호를 보내면 이 메서드가 호출됩니다. 콘텐츠는 기본 싱크에 대한 쓰기를 완료하고 싱크에 대한 액세스를 해제하는 데 필요한 작업을 실행해야 합니다. 이 프로세스가 비동기인 경우 성공 또는 실패를 알리는 약속을 반환할 수 있습니다. 이 메서드는 대기열에 있는 모든 쓰기가 성공한 후에만 호출됩니다.abort(reason)
: 앱이 스트림을 갑자기 닫고 오류 상태로 전환하겠다고 신호를 보내면 이 메서드가 호출됩니다.close()
와 마찬가지로 보유된 리소스를 정리할 수 있지만 쓰기가 대기열에 추가된 경우에도abort()
가 호출됩니다. 이러한 청크는 삭제됩니다. 이 프로세스가 비동기인 경우 성공 또는 실패를 알리는 프라미스를 반환할 수 있습니다.reason
매개변수에는 스트림이 중단된 이유를 설명하는DOMString
이 포함됩니다.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
스트림 API의 WritableStreamDefaultController
인터페이스는 설정 중에, 쓰기를 위해 더 많은 청크가 제출될 때 또는 쓰기 끝에 WritableStream
의 상태를 제어할 수 있는 컨트롤러를 나타냅니다. WritableStream
를 생성할 때 기본 싱크에 조작할 해당 WritableStreamDefaultController
인스턴스가 제공됩니다. WritableStreamDefaultController
에는 WritableStreamDefaultController.error()
라는 메서드가 하나만 있으며, 이 메서드는 연결된 스트림과의 향후 상호작용에서 오류가 발생하도록 합니다.
WritableStreamDefaultController
는 AbortSignal
인스턴스를 반환하는 signal
속성도 지원하므로 필요한 경우 WritableStream
작업을 중지할 수 있습니다.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
WritableStream()
생성자의 두 번째 인수(선택사항)는 queuingStrategy
입니다.
스트림의 대기열 전략을 선택적으로 정의하는 객체이며 다음 두 매개변수를 사용합니다.
highWaterMark
: 이 대기열 전략을 사용하는 스트림의 높은 워터마크를 나타내는 음수가 아닌 숫자입니다.size(chunk)
: 주어진 청크 값의 유한한 음수가 아닌 크기를 계산하여 반환하는 함수입니다. 결과는 적절한WritableStreamDefaultWriter.desiredSize
속성을 통해 나타나는 배압을 결정하는 데 사용됩니다.
getWriter()
및 write()
메서드
쓰기 가능한 스트림에 쓰려면 WritableStreamDefaultWriter
인 작성자가 필요합니다. WritableStream
인터페이스의 getWriter()
메서드는 WritableStreamDefaultWriter
의 새 인스턴스를 반환하고 스트림을 해당 인스턴스로 잠급니다. 스트림이 잠겨 있는 동안 현재 스트림이 해제될 때까지 다른 작성자를 획득할 수 없습니다.
WritableStreamDefaultWriter
인터페이스의 write()
메서드는 전달된 데이터 청크를 WritableStream
및 기본 싱크에 쓴 다음 쓰기 작업의 성공 또는 실패를 나타내는 프로미스를 반환합니다. '성공'의 의미는 기본 싱크에 따라 달라질 수 있습니다. 청크가 수락되었음을 나타낼 수 있지만 최종 목적지에 안전하게 저장되었음을 나타내지는 않습니다.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
locked
속성
쓰기 가능한 스트림이 잠겨 있는지 확인하려면 WritableStream.locked
속성에 액세스하면 됩니다.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
쓰기 가능한 스트림 코드 샘플
아래 코드 샘플은 모든 단계를 보여줍니다.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
읽기 가능한 스트림을 쓰기 가능한 스트림으로 파이프
읽기 가능한 스트림은 읽기 가능한 스트림의 pipeTo()
메서드를 통해 쓰기 가능한 스트림으로 파이프될 수 있습니다.
ReadableStream.pipeTo()
는 현재 ReadableStream
를 지정된 WritableStream
에 파이프하고 파이프 프로세스가 성공적으로 완료되면 이행되거나 오류가 발생하면 거부되는 프라미스를 반환합니다.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
변환 스트림 만들기
Streams API의 TransformStream
인터페이스는 변환 가능한 데이터 집합을 나타냅니다. 생성자 TransformStream()
를 호출하여 변환 스트림을 만듭니다. 그러면 지정된 핸들러에서 변환 스트림 객체가 생성되고 반환됩니다. TransformStream()
생성자는 transformer
을 나타내는 선택적 JavaScript 객체를 첫 번째 인수로 허용합니다. 이러한 객체는 다음 메서드를 포함할 수 있습니다.
transformer
start(controller)
: 이 메서드는 객체가 생성될 때 즉시 호출됩니다. 일반적으로 이는controller.enqueue()
을 사용하여 접두사 청크를 대기열에 추가하는 데 사용됩니다. 이러한 청크는 읽기 가능한 측면에서 읽히지만 쓰기 가능한 측면에 대한 쓰기에는 의존하지 않습니다. 이 초기 프로세스가 비동기식인 경우(예: 접두사 청크를 획득하는 데 약간의 노력이 필요한 경우) 함수는 성공 또는 실패를 알리는 프로미스를 반환할 수 있습니다. 거부된 프로미스는 스트림에 오류를 발생시킵니다. 발생한 예외는TransformStream()
생성자에 의해 다시 발생합니다.transform(chunk, controller)
: 이 메서드는 쓰기 가능한 측면에 원래 쓰여진 새 청크가 변환될 준비가 되면 호출됩니다. 스트림 구현은 이 함수가 이전 변환이 성공한 후에만 호출되고start()
가 완료되기 전이나flush()
가 호출된 후에는 호출되지 않음을 보장합니다. 이 함수는 변환 스트림의 실제 변환 작업을 실행합니다.controller.enqueue()
를 사용하여 결과를 대기열에 추가할 수 있습니다. 이를 통해 쓰기 가능한 쪽에 기록된 단일 청크가controller.enqueue()
가 호출된 횟수에 따라 읽기 가능한 쪽에 0개 또는 여러 개의 청크를 생성할 수 있습니다. 변환 프로세스가 비동기인 경우 이 함수는 변환의 성공 또는 실패를 알리는 프로미스를 반환할 수 있습니다. 거부된 프라미스는 변환 스트림의 읽기 가능 및 쓰기 가능 측면 모두에서 오류를 발생시킵니다.transform()
메서드가 제공되지 않으면 쓰기 가능한 측면에서 읽기 가능한 측면으로 변경되지 않은 청크를 대기열에 추가하는 ID 변환이 사용됩니다.flush(controller)
: 이 메서드는 쓰기 가능한 쪽에 쓰인 모든 청크가transform()
을 통과하여 변환된 후 호출되며 쓰기 가능한 쪽이 닫히려고 합니다. 일반적으로 이는 읽기 가능한 측면이 닫히기 전에 여기에 접미사 청크를 대기열에 추가하는 데 사용됩니다. 플러시 프로세스가 비동기인 경우 함수는 성공 또는 실패를 알리는 프라미스를 반환할 수 있습니다. 결과는stream.writable.write()
호출자에게 전달됩니다. 또한 거부된 프라미스는 스트림의 읽기 가능 및 쓰기 가능 측면 모두에서 오류를 발생시킵니다. 예외를 발생시키는 것은 거부된 프로미스를 반환하는 것과 동일하게 처리됩니다.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
writableStrategy
및 readableStrategy
대기열 전략
TransformStream()
생성자의 두 번째 및 세 번째 선택적 매개변수는 선택적 writableStrategy
및 readableStrategy
대기열 전략입니다. 이는 각각 읽기 가능 및 쓰기 가능 스트림 섹션에 설명된 대로 정의됩니다.
변환 스트림 코드 샘플
다음 코드 샘플은 간단한 변환 스트림이 작동하는 모습을 보여줍니다.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
변환 스트림을 통해 읽을 수 있는 스트림 파이핑
ReadableStream
인터페이스의 pipeThrough()
메서드는 변환 스트림이나 기타 쓰기 가능/읽기 가능 쌍을 통해 현재 스트림을 파이프하는 연결 가능한 방법을 제공합니다. 스트림을 파이프하면 일반적으로 파이프 기간 동안 스트림이 잠겨 다른 리더가 스트림을 잠글 수 없습니다.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
다음 코드 샘플 (약간 인위적임)은 반환된 응답 프롬프트를 스트림으로 소비하고 청크별로 대문자로 변환하여 모든 텍스트를 대문자로 변환하는 fetch()
의 '외침' 버전을 구현하는 방법을 보여줍니다. 이 접근 방식의 장점은 전체 문서가 다운로드될 때까지 기다릴 필요가 없다는 것입니다. 이는 큰 파일을 처리할 때 큰 차이를 만들 수 있습니다.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
데모
아래 데모에서는 읽기 가능, 쓰기 가능, 변환 스트림이 작동하는 모습을 보여줍니다. 또한 pipeThrough()
및 pipeTo()
파이프 체인의 예시가 포함되어 있으며 tee()
도 보여줍니다. 원하는 경우 자체 창에서 데모를 실행하거나 소스 코드를 볼 수 있습니다.
브라우저에서 사용할 수 있는 유용한 스트림
브라우저에 바로 내장된 유용한 스트림이 많이 있습니다. 블롭에서 ReadableStream
를 쉽게 만들 수 있습니다. Blob
인터페이스의 stream() 메서드는 읽을 때 blob 내에 포함된 데이터를 반환하는 ReadableStream
를 반환합니다. File
객체는 특정 종류의 Blob
이며 blob이 사용될 수 있는 모든 컨텍스트에서 사용할 수 있습니다.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
TextDecoder.decode()
및 TextEncoder.encode()
의 스트리밍 변형은 각각 TextDecoderStream
및 TextEncoderStream
로 호출됩니다.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
CompressionStream
및 DecompressionStream
변환 스트림을 각각 사용하여 파일을 쉽게 압축하거나 압축 해제할 수 있습니다. 아래 코드 샘플은 스트림 사양을 다운로드하고 브라우저에서 바로 압축 (gzip)한 후 압축된 파일을 디스크에 직접 쓰는 방법을 보여줍니다.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
파일 시스템 액세스 API의 FileSystemWritableFileStream
및 실험용 fetch()
요청 스트림은 실제 쓰기 가능한 스트림의 예입니다.
Serial API는 읽기 가능 스트림과 쓰기 가능 스트림을 모두 많이 사용합니다.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
마지막으로 WebSocketStream
API는 스트림을 WebSocket API와 통합합니다.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
유용한 리소스
감사의 말씀
이 도움말은 제이크 아치볼드, 프랑수아 보포르, 샘 더튼, 마티아스 불렌스, Surma, 조 메들리, 애덤 라이스가 검토했습니다. 제이크 아치볼드의 블로그 게시물은 스트림을 이해하는 데 많은 도움이 되었습니다. 일부 코드 샘플은 GitHub 사용자 @bellbind의 탐색에서 영감을 받았으며, 산문의 일부는 스트림에 관한 MDN 웹 문서를 기반으로 합니다. 스트림 표준의 저자는 이 사양을 작성하는 데 엄청난 노력을 기울였습니다. 히어로 이미지는 Unsplash의 Ryan Lara가 제공했습니다.