Streams API ile okunabilir, yazılabilir ve dönüştürülebilir akışları nasıl kullanacağınızı öğrenin.
Streams API, ağ üzerinden alınan veya yerel olarak herhangi bir yöntemle oluşturulan veri akışlarına programatik olarak erişmenize ve bunları JavaScript ile işlemenize olanak tanır. Akış; almak, göndermek veya dönüştürmek istediğiniz kaynağın küçük parçalara ayrılmasını ve ardından bu parçaların parça parça işlenmesini içerir. Web sayfalarında gösterilecek HTML veya videolar gibi öğeler alırken tarayıcıların yaptığı bir işlem olsa da bu özellik, akış içeren fetch
özelliği 2015'te kullanıma sunulmadan önce JavaScript'te kullanılabilir durumda değildi.
Önceden, bir tür kaynağı (video, metin dosyası vb.) işlemek istediğinizde, dosyanın tamamını indirmeniz, uygun bir biçimde seri haline getirilmesini beklemeniz ve ardından işlemeniz gerekiyordu. Akışlar JavaScript'in kullanılabildiğinde bu durum değişir. Artık ham verileri istemcide kullanıma sunulduğu kadar kısa sürede ve arabellek, dize veya blob oluşturmaya gerek kalmadan JavaScript ile kademeli olarak işleyebilirsiniz. Bu, bir dizi kullanım alanının kilidini açar. Bunlardan bazılarını aşağıda listeledik:
- Video efektleri: Okunabilir bir video akışını, gerçek zamanlı olarak efekt uygulayan bir dönüşüm akışı üzerinden bağlama.
- Veri (açma): Bir dosya akışını seçerek sıkıştıran (açma) bir dönüşüm akışı üzerinden bağlama.
- Resim kodu çözme: Baytların kodunu bit eşlem verilerine çözen bir dönüşüm akışı ve ardından bit eşlemleri PNG'lere dönüştüren başka bir dönüştürme akışı aracılığıyla bir HTTP yanıt akışı bağlama. Bir Service Worker'ın
fetch
işleyicisinin içine yüklenirse AVIF gibi yeni resim biçimlerini şeffaf bir şekilde çoklu doldurabilmenizi sağlar.
Tarayıcı desteği
ReadableStream ve WritableStream
TransformStream
Temel kavramlar
Çeşitli akış türlerinin ayrıntılarına geçmeden önce bazı temel kavramlardan bahsetmek istiyorum.
Büyük Parça
Yığın, bir akışa yazılan veya bir akıştan okunan tek bir veri parçasıdır. Bu akış herhangi bir türde olabilir. Akışlar, farklı türlerde parçalar bile içerebilir. Çoğu zaman yığın, belirli bir akış için en atomik veri birimi olmaz. Örneğin, bir bayt akışı, tek bayt yerine 16 KiB Uint8Array
biriminden oluşan parçalar içerebilir.
Okunabilir akışlar
Okunabilir bir akış, veri kaynağını okuyabildiğinizi gösterir. Başka bir deyişle, veriler okunabilir bir akıştan çıkar. Somut olarak, okunabilir bir akış ReadableStream
sınıfının bir örneğidir.
Yazılabilir akışlar
Yazılabilir akış, veriler için yazabileceğiniz bir hedefi temsil eder. Başka bir deyişle, veriler yazılabilir bir akışa aktarılır. Yazılabilir bir akış somut olarak, WritableStream
sınıfının bir örneğidir.
Akışları dönüştürme
Bir dönüşüm akışı, bir akış çiftinden oluşur: Yazılabilir tarafı olarak bilinen yazılabilir bir akış ve okunabilir tarafı olarak bilinen okunabilir bir akış.
Bunun için gerçek dünyadaki bir metafor, anında bir dilden diğerine çeviri yapan simultane çevirmen olabilir.
Dönüşüm akışına özel bir şekilde, yazılabilir tarafa yazmak yeni verilerin okunabilir taraftan okunmak üzere hazırlanmasına neden olur. Özetle, writable
özelliği ve readable
özelliğine sahip herhangi bir nesne dönüşüm akışı olarak hizmet verebilir. Bununla birlikte, standart TransformStream
sınıfı düzgün bir şekilde birbirine dolanmış böyle bir çift oluşturmayı kolaylaştırır.
Boru zincirleri
Akışlar temel olarak birbirine bağlama özelliğiyle kullanılır. Okunabilir bir akış, okunabilir akışın pipeTo()
yöntemi kullanılarak doğrudan yazılabilir bir akışa bağlanabilir veya okunabilir akışın pipeThrough()
yöntemi kullanılarak önce bir veya daha fazla dönüşüm akışından geçirilebilir. Bu şekilde birbirine bağlanan bir akış kümesi ardışık düzen zinciri olarak adlandırılır.
Ters Basınç
Bir boru zinciri oluşturulduktan sonra, parçaların içinden ne kadar hızlı akması gerektiğiyle ilgili sinyaller yayar. Zincirdeki herhangi bir adım henüz parçaları kabul edemiyorsa bu parça, boru zincirinden geriye doğru bir sinyal yayar ve en sonunda orijinal kaynağa parçalar oluşturmayı o kadar hızlı bir şekilde durdurması söyleninceye kadar devam eder. Bu akışı normalleştirme sürecine karşı basınç denir.
Diş teelemesi
Okunabilir bir akış, tee()
yöntemi kullanılarak "T" harfinin şekline göre isimlendirilebilir.
Bu işlem akışı kilitleyerek artık doğrudan kullanılamaz hale getirir ancak bağımsız olarak kullanılabilen dallar adı verilen iki yeni akış oluşturur.
Yayınlar geri sarılamadığı veya yeniden başlatılamayacağı için başlangıç vuruşu da önem taşır. Bu konu hakkında daha sonra bilgi vereceğiz.
Okunabilir bir akışın işleyişi
Okunabilir bir akış, JavaScript'te temel bir kaynaktan gelen ReadableStream
nesnesi tarafından temsil edilen bir veri kaynağıdır. ReadableStream()
oluşturucu, belirtilen işleyicilerden okunabilir bir akış nesnesi oluşturur ve döndürür. İki tür temel kaynak vardır:
- Eriştiğinizde aktarılan kaynaklar size sürekli olarak veri aktarır. Akışa erişimi başlatmak, duraklatmak veya iptal etmek size bağlıdır. Örnekler arasında canlı video akışları, sunucu tarafından gönderilen etkinlikler veya WebSockets yer alır.
- Pull kaynakları, bağlandıktan sonra bunlardan açıkça veri istemenizi gerektirir.
fetch()
veyaXMLHttpRequest
çağrıları aracılığıyla HTTP işlemleri buna örnek olarak gösterilebilir.
Akış verileri, parça adı verilen küçük parçalar halinde sırayla okunur. Akışa yerleştirilen parçaların sıralandığı söylenir. Yani okunmak için bir sırada bekliyorlar. Dahili sıra, henüz okunmamış olan parçaları takip eder.
Sıralama stratejisi, bir akışın dahili sırasının durumuna göre geri basınç sinyalini nasıl vermesi gerektiğini belirleyen nesnedir. Sıraya ekleme stratejisi, her parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu, yüksek su işareti olarak bilinen belirli bir sayıyla karşılaştırır.
Akış içindeki parçalar bir okuyucu tarafından okunur. Bu okuyucu, verileri tek seferde bir parça olarak alarak üzerinde istediğiniz işlemi gerçekleştirmenize imkan tanır. Okuyucu ve onunla birlikte gelen diğer işleme kodu tüketici olarak adlandırılır.
Bu bağlamdaki bir sonraki yapı denetleyici olarak adlandırılır. Okunabilir her akışın, adından da anlaşılacağı gibi akışı kontrol etmenize olanak tanıyan ilişkili bir denetleyicisi vardır.
Bir akışı aynı anda yalnızca bir okuyucu okuyabilir. Bir okuyucu oluşturulduğunda ve bir akışı okumaya başladığında (yani etkin okuyucu olduğunda) akışa kilitlenir. Yayınınızı başka bir okuyucunun okumayı devralmasını istiyorsanız genellikle başka bir şey yapmadan önce ilk okuyucuyu yayınlamanız gerekir (ancak akış başlatma edebilirsiniz).
Okunabilir bir akış oluşturma
Oluşturucuyu ReadableStream()
çağırarak okunabilir bir akış oluşturursunuz.
Oluşturucu, isteğe bağlı bir bağımsız değişkene sahiptir: underlyingSource
. Bu bağımsız değişken, oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntem ve özelliklere sahip bir nesneyi temsil eder.
underlyingSource
Bu işlev için geliştirici tarafından tanımlanan isteğe bağlı yöntemler kullanılabilir:
start(controller)
: Nesne oluşturulduğunda hemen çağrılır. Yöntem, akış kaynağına erişebilir ve akış işlevini ayarlamak için gereken diğer her şeyi yapabilir. Bu işlem eşzamansız olarak yapılacaksa yöntem, başarı veya başarısızlığı işaret etme sözü verebilir. Bu yönteme geçirilencontroller
parametresiReadableStreamDefaultController
değeridir.pull(controller)
: Daha fazla parça getirildikçe akışı kontrol etmek için kullanılabilir. Akışın dahili parçalardan oluşan sırası dolu olmadığı sürece, sıra en üst seviyeye ulaşana kadar tekrar tekrar çağrılır.pull()
çağrısının sonucu bir taahhütsepull()
sözü edilene kadar tekrar çağrılmaz. Vaat reddedilirse akışta hata oluşur.cancel(reason)
: Akış tüketicisi akışı iptal ettiğinde çağrılır.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
aşağıdaki yöntemleri destekler:
ReadableStreamDefaultController.close()
ilişkili akışı kapatır.ReadableStreamDefaultController.enqueue()
, belirli bir öbeği ilgili akışta sıraya alır.ReadableStreamDefaultController.error()
, ilişkili akışla gelecekte yapılacak etkileşimlerde hataya neden olur.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Aynı şekilde isteğe bağlı olarak ReadableStream()
kurucusunun ikinci bağımsız değişkeni queuingStrategy
bağımsız değişkenidir.
Bu, akış için isteğe bağlı olarak bir sıraya koyma stratejisi tanımlayan ve iki parametre alan bir nesnedir:
highWaterMark
: Bu sıralama stratejisini kullanan akışın yüksek su işaretini belirten negatif olmayan bir sayı.size(chunk)
: Belirli bir parçanın negatif olmayan sonlu boyutunu hesaplayıp döndüren bir işlev. Sonuç, uygunReadableStreamDefaultController.desiredSize
özelliği üzerinden hissedilen, karşı baskıyı belirlemek için kullanılır. Ayrıca, temel kaynağınpull()
yönteminin ne zaman çağrıldığını da yönetir.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
getReader()
ve read()
yöntemleri
Okunabilir bir akıştan okumak için ReadableStreamDefaultReader
boyutunda okuyucuya ihtiyacınız vardır.
ReadableStream
arayüzünün getReader()
yöntemi, bir okuyucu oluşturur ve akışı buna kilitler. Akış kilitli durumdayken, bu kitap yayınlanana kadar başka okuyucu edinilemez.
ReadableStreamDefaultReader
arayüzünün read()
yöntemi, akışın dahili sırasındaki bir sonraki parçaya erişim sağlama sözü döndürür. Akışın durumuna bağlı olarak bir sonuçla isteği karşılar veya reddeder. Farklı olasılıklar şunlardır:
- Parça varsa sözü şu biçimdeki bir nesne ile yerine getirilecektir:
{ value: chunk, done: false }
. - Akış kapatılırsa taahhüt şu biçimde bir nesne ile yerine getirilir:
{ value: undefined, done: true }
. - Akışta hata oluşursa söz konusu hatayla birlikte reddedilir.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
locked
özelliği
Okunabilir bir akışın kilitli olup olmadığını kontrol etmek için ReadableStream.locked
mülküne erişebilirsiniz.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Okunabilir akış kodu örnekleri
Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir. İlk olarak, underlyingSource
bağımsız değişkeninde (yani TimestampSource
sınıfı) bir start()
yöntemini tanımlayan bir ReadableStream
oluşturursunuz.
Bu yöntem, on saniyede bir, akışın controller
öğesine enqueue()
zaman damgası bildirir.
Son olarak da kumandaya akışı close()
talimatı verir. getReader()
yöntemiyle bir okuyucu oluşturup akış done
olana kadar read()
yöntemini çağırarak bu akışı kullanırsınız.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Eşzamansız yineleme
Akışın done
olup olmadığını her read()
döngüsü yinelemesini kontrol etmek en uygun API olmayabilir.
Neyse ki yakında bunu yapmanın daha iyi bir yolu olacak: eşzamansız yineleme.
for await (const chunk of stream) {
console.log(chunk);
}
Günümüzde eşzamansız yinelemeyi kullanmaya yönelik geçici bir çözüm, davranışı bir çoklu dolgu ile uygulamaktır.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Okunabilir bir akış videosu sunma
ReadableStream
arayüzünün tee()
yöntemi mevcut okunabilir akışı devreye sokar ve sonuçta ortaya çıkan iki dalı içeren iki öğeli bir diziyi yeni ReadableStream
örnekleri olarak döndürür. Bu, iki okuyucunun bir akışı aynı anda okumasına olanak tanır. Örneğin, sunucudan bir yanıt alıp tarayıcıya aktarmak, ancak aynı zamanda hizmeti Service Worker önbelleğine de aktarmak istiyorsanız bir Service Worker'da bunu yapabilirsiniz. Yanıt gövdesi bir defadan fazla kullanılamadığından bunu yapmak için iki kopyaya ihtiyacınız vardır. Akışı iptal etmek için sonuçta ortaya çıkan her iki dalı da iptal etmeniz gerekir. Bir akış için bağlantı oluşturduğunuzda, akış genellikle bu süre boyunca kilitlenir ve diğer okuyucular tarafından kilitlenmesi engellenir.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Okunabilir bayt akışları
Baytları temsil eden akışlarda, baytları verimli bir şekilde işlemek için özellikle kopyaları en aza indirerek okunabilir akışın genişletilmiş bir sürümü sağlanır. Bayt akışları, kendi arabelleğinizi getirme (BYOB) okuyucuların edinilmesini sağlar. Varsayılan uygulama, WebSockets'te dizeler veya dizi arabellekleri gibi farklı çıkışlar sağlayabilirken bayt akışları, bayt çıkışını garanti eder. Buna ek olarak, BYOB okuyucuların kararlılık avantajları vardır. Çünkü bir tamponun ayrılması durumunda tamponun aynı tampona iki kez yazılmaması sağlanır ve böylece yarış koşulları önlenir. BYOB okuyucuları, tarayıcının arabellekleri yeniden kullanabildiği için çöp toplama işlemi çalıştırma sayısını azaltabilir.
Okunabilir bir bayt akışı oluşturma
ReadableStream()
oluşturucuya ek bir type
parametresi ileterek okunabilir bir bayt akışı oluşturabilirsiniz.
new ReadableStream({ type: 'bytes' });
underlyingSource
Okunabilir bir bayt akışının temel kaynağına, değişiklik yapması için bir ReadableByteStreamController
verilir. ReadableByteStreamController.enqueue()
yöntemi, değeri ArrayBufferView
olan bir chunk
bağımsız değişkeni alır. ReadableByteStreamController.byobRequest
özelliği, geçerli BYOB pull isteğini veya hiç yoksa null döndürür. Son olarak ReadableByteStreamController.desiredSize
özelliği, kontrollü akışın dahili sırasını doldurmak için istenen boyutu döndürür.
queuingStrategy
Aynı şekilde isteğe bağlı olarak ReadableStream()
kurucusunun ikinci bağımsız değişkeni queuingStrategy
bağımsız değişkenidir.
Bu, akış için isteğe bağlı olarak bir sıraya koyma stratejisi tanımlayan ve bir parametre alan nesnedir:
highWaterMark
: Bu sıralama stratejisini kullanan akışın yüksek su işaretini belirten negatif olmayan bir bayt sayısı. Bu değer, karşı baskıyı belirlemek için kullanılır ve kendini uygunReadableByteStreamController.desiredSize
özelliği aracılığıyla gösterir. Ayrıca, temel kaynağınpull()
yönteminin ne zaman çağrıldığını da yönetir.
getReader()
ve read()
yöntemleri
Daha sonra, mode
parametresini uygun şekilde ayarlayarak ReadableStreamBYOBReader
için erişim elde edebilirsiniz:
ReadableStream.getReader({ mode: "byob" })
. Bu, kopyaları önlemek için tampon ayırma üzerinde daha hassas bir kontrol sağlar. Bayt akışından okumak için ReadableStreamBYOBReader.read(view)
işlevini çağırmanız gerekir. Burada view
, ArrayBufferView
değeridir.
Okunabilir bayt akışı kod örneği
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Aşağıdaki işlev, rastgele oluşturulmuş bir dizinin sıfır kopyayla verimli bir şekilde okunmasını sağlayan okunabilir bayt akışları döndürür. Önceden belirlenmiş 1.024 yığın boyutunu kullanmak yerine, tam kontrol sağlamak için geliştirici tarafından sağlanan arabelleği doldurmaya çalışır.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Yazılabilir akışın mekaniği
Yazılabilir akış, JavaScript'te WritableStream
nesnesiyle gösterilen veri yazabileceğiniz bir hedeftir. Bu, alttaki havuzun (ham verilerin yazıldığı daha düşük seviyeli bir G/Ç havuzu) üzerinde bir soyutlama işlevi görür.
Veriler, her defasında bir parça olmak üzere yazar aracılığıyla akışa yazılır. Bölümler, tıpkı bir okuyucudaki parçalar gibi çok çeşitli biçimlerde olabilir. Yazmaya hazır parçaları üretmek için istediğiniz kodu kullanabilirsiniz. Yazar ve ilişkili kod yapımcı olarak adlandırılır.
Bir yazar oluşturulduğunda ve bir akışa (etkin yazar) yazmaya başladığında onun bu içeriğe bağlı olduğu söylenir. Yazılabilir bir akışa aynı anda yalnızca bir yazar yazabilir. Yayınınıza başka bir yazarın yazmaya başlamasını istiyorsanız genellikle başka bir yazar eklemeden önce onu yayınlamanız gerekir.
Dahili sıra, akışa yazılan ancak henüz temel havuz tarafından işlenmemiş olan parçaları takip eder.
Sıralama stratejisi, bir akışın dahili sırasının durumuna göre geri basınç sinyalini nasıl vermesi gerektiğini belirleyen nesnedir. Sıraya ekleme stratejisi, her parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu, yüksek su işareti olarak bilinen belirli bir sayıyla karşılaştırır.
Nihai yapıya denetleyici adı verilir. Her yazılabilir akışın, akışı kontrol etmenize (örneğin, iptal etmek) olanak tanıyan ilişkili bir denetleyicisi vardır.
Yazılabilir bir akış oluşturma
Streams API'nin WritableStream
arayüzü, akış verilerini havuz olarak bilinen bir hedefe yazmak için standart bir soyutlama sağlar. Bu nesne, yerleşik karşı basınç ve sıralama içerir. Oluşturucuyu WritableStream()
çağırarak yazılabilir bir akış oluşturursunuz.
İsteğe bağlı underlyingSink
parametresi bulunur. Bu parametre, oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntemler ve özelliklere sahip bir nesneyi temsil eder.
underlyingSink
underlyingSink
, geliştirici tarafından tanımlanan aşağıdaki isteğe bağlı yöntemleri içerebilir. Yöntemlerden bazılarına iletilen controller
parametresi bir WritableStreamDefaultController
değeridir.
start(controller)
: Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Bu yöntemin içeriği, altta yatan havuza erişim sağlamayı amaçlamalıdır. Bu süreç eşzamansız olarak yapılacaksa başarı veya başarısızlık sinyali sağlayabilir.write(chunk, controller)
: Bu yöntem, yeni bir veri parçası (chunk
parametresinde belirtilir) temel havuza yazılmaya hazır olduğunda çağrılır. Yazma işleminin başarılı veya başarısız olduğunu işaret eden bir söz verebilir. Bu yöntem, yalnızca önceki yazmalar başarılı olduktan sonra çağrılır ve akış kapatıldıktan veya iptal edildikten sonra hiçbir zaman çağrılmaz.close(controller)
: Bu yöntem, uygulama, akışa parça yazmayı tamamladığını bildirirse çağrılır. İçerikler, alttaki havuza yazma işlemlerini sonlandırmak ve buna erişimi serbest bırakmak için gereken işlemi yapmalıdır. Bu süreç eşzamansızsa başarı veya başarısızlık sinyalini verebilir. Bu yöntem yalnızca sıraya alınan tüm yazma işlemleri başarılı olduktan sonra çağrılır.abort(reason)
: Bu yöntem, uygulama, akışı aniden kapatmak ve hatalı bir duruma sokmak istediğini belirtirse çağrılır.close()
gibi bekletilen kaynakları temizleyebilir ancak yazmalar sıraya alınmış olsa bileabort()
çağrılır. Bu parçalar atlanacak. Bu süreç eşzamansızsa başarı veya başarısızlık sinyali sağlayabilir.reason
parametresi, akışın neden iptal edildiğini açıklayan birDOMString
içerir.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Streams API'nin WritableStreamDefaultController
arayüzü, yazma için veya yazmanın sonunda daha fazla parça gönderildiğinden kurulum sırasında WritableStream
durumunun kontrol edilmesini sağlayan bir denetleyiciyi temsil eder. WritableStream
oluştururken altta yatan havuza değiştirebileceği karşılık gelen bir WritableStreamDefaultController
örneği verilir. WritableStreamDefaultController
öğesinin tek bir yöntemi vardır: WritableStreamDefaultController.error()
. Bu yöntem, ilişkili akışla gelecekte yapılacak etkileşimlerde hataya neden olur.
WritableStreamDefaultController
, AbortSignal
örneğini döndüren ve gerektiğinde WritableStream
işleminin durdurulmasına izin veren signal
özelliğini de destekler.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Aynı şekilde isteğe bağlı olarak WritableStream()
kurucusunun ikinci bağımsız değişkeni queuingStrategy
bağımsız değişkenidir.
Bu, akış için isteğe bağlı olarak bir sıraya koyma stratejisi tanımlayan ve iki parametre alan bir nesnedir:
highWaterMark
: Bu sıralama stratejisini kullanan akışın yüksek su işaretini belirten negatif olmayan bir sayı.size(chunk)
: Belirli bir parçanın negatif olmayan sonlu boyutunu hesaplayıp döndüren bir işlev. Sonuç, uygunWritableStreamDefaultWriter.desiredSize
özelliği üzerinden hissedilen, karşı baskıyı belirlemek için kullanılır.
getWriter()
ve write()
yöntemleri
Yazılabilir bir akışa yazmak için WritableStreamDefaultWriter
türünde bir yazara ihtiyacınız vardır. WritableStream
arayüzünün getWriter()
yöntemi, yeni WritableStreamDefaultWriter
örneğini döndürür ve akışı bu örneğe kilitler. Yayın kilitliyken, geçerli yayın yayınlanana kadar başka yazar edinilemez.
WritableStreamDefaultWriter
arayüzünün write()
yöntemi, geçirilen bir veri parçasını WritableStream
'a ve onun altındaki havuza yazar, ardından yazma işleminin başarılı veya başarısız olduğunu belirten bir vaat döndürür. "Başarı"nın ne anlama geldiğinin temelde olduğunu unutmayın. Bu, parçanın kabul edildiğini ve nihai hedefine güvenli bir şekilde kaydedildiğini gösterebilir.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
locked
özelliği
Yazılabilir bir akışın kilitli olup olmadığını kontrol etmek için ilgili akışın WritableStream.locked
özelliğine erişebilirsiniz.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Yazılabilir akış kodu örneği
Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Okunabilir bir akışı yazılabilir bir akışa bağlama
Okunabilir bir akış, okunabilir akışın pipeTo()
yöntemi aracılığıyla yazılabilir bir akışa aktarılabilir.
ReadableStream.pipeTo()
, mevcut ReadableStream
değerini belirli bir WritableStream
öğesine bağlar ve bağlantı işlemi başarıyla tamamlandığında yerine getirilecek veya hatalarla karşılaşılırsa reddedilecek bir söz döndürür.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Dönüşüm akışı oluşturma
Streams API'nin TransformStream
arayüzü, dönüştürülebilir bir veri kümesini temsil eder. Dönüşüm akışı oluşturmak için kurucuyu TransformStream()
çağırın, bu işlem belirtilen işleyicilerden bir dönüşüm akışı nesnesi oluşturur ve döndürür. TransformStream()
oluşturucusu, ilk bağımsız değişkeni olarak transformer
öğesini temsil eden isteğe bağlı bir JavaScript nesnesini kabul eder. Bu tür nesneler, aşağıdaki yöntemlerin herhangi birini içerebilir:
transformer
start(controller)
: Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Bu, genelliklecontroller.enqueue()
kullanılarak ön ek parçalarını sıraya koymak için kullanılır. Bu parçalar, okunabilir taraftan okunur ancak yazılabilir tarafa herhangi bir yazma işlemi gerektirmez. Bu başlangıç işlemi eşzamansızsa (örneğin, ön ek parçalarını edinmek için biraz çaba sarf etmesi nedeniyle) işlev, başarı veya başarısızlık sinyalini verecek bir söz verebilir; reddedilen bir söz ise akışta hataya neden olur. Atılan istisnalar,TransformStream()
oluşturucusu tarafından yeniden atanır.transform(chunk, controller)
: Bu yöntem, başlangıçta yazılabilir tarafa yazılan yeni bir parça dönüştürülmeye hazır olduğunda çağrılır. Akış uygulaması, bu işlevin yalnızca önceki dönüşümler başarılı olduktan sonra çağrılacağını vestart()
tamamlanmadan veyaflush()
çağrıldıktan sonra hiçbir zaman çağrılacağını garanti eder. Bu işlev, dönüşüm akışının asıl dönüştürme işlemini gerçekleştirir.controller.enqueue()
işlevini kullanarak sonuçları sıraya alabilir. Bu, yazılabilir tarafa yazılan tek bir parçanın,controller.enqueue()
işlevinin çağrılma sayısına bağlı olarak okunabilir tarafta sıfır veya birden fazla parçayla sonuçlanmasına izin verir. Dönüşüm süreci eşzamansızsa bu işlev, dönüşümün başarılı veya başarısız olduğunun sinyalini verecek bir vaat getirebilir. Reddedilen taahhüt, dönüşüm akışının hem okunabilir hem de yazılabilir taraflarında hataya neden olur.transform()
yöntemi sağlanmazsa kimlik dönüşümü kullanılır. Bu işlem, parçaları yazılabilir taraftan okunabilir tarafa doğru sıralar.flush(controller)
: Bu yöntem, yazılabilir tarafa yazılan tüm parçalartransform()
içinden başarılı bir şekilde geçirilerek dönüştürüldükten ve yazılabilir taraf kapatılmak üzereyken çağrılır. Tipik olarak bu, sonek parçaları kapanmadan önce okunabilecek tarafı kuyruğa almak için kullanılır. Temizleme işlemi eşzamansızsa işlev, başarılı veya başarısız olduğunu işaret etmek için bir vaat döndürebilir. Sonuç,stream.writable.write()
çağrısını yapana bildirilir. Buna ek olarak, reddedilen bir taahhüt, akışın hem okunabilir hem de yazılabilir taraflarında hataya neden olur. İstisna vermek, reddedilen bir sözü döndürmekle aynı şekilde değerlendirilir.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
writableStrategy
ve readableStrategy
sıralama stratejileri
TransformStream()
oluşturucunun ikinci ve üçüncü isteğe bağlı parametreleri, isteğe bağlı writableStrategy
ve readableStrategy
sıralama stratejileridir. Bunlar, sırasıyla okunabilir ve yazılabilir akış bölümlerinde özetlendiği şekilde tanımlanır.
Dönüşüm akışı kod örneği
Aşağıdaki kod örneğinde basit bir dönüşüm akışını çalışırken görebilirsiniz.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Bir dönüşüm akışı üzerinden okunabilir bir akışı bağlama
ReadableStream
arayüzünün pipeThrough()
yöntemi, mevcut akışı bir dönüşüm akışı veya başka bir yazılabilir/okunabilir çift aracılığıyla bağlamak için zincirlenebilir bir yol sağlar. Bir akışın ardışık düzeni, genellikle ardışık düzen süresince kilitleyerek diğer okuyucuların onu kilitlemesini önler.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Sıradaki kod örneğinde (biraz düşünülmüş), döndürülen yanıt sözünü akış olarak kullanıp yığına göre büyük harf kullanarak tüm metni büyük harfle yapan bir fetch()
"bağırma" sürümünü nasıl uygulayabileceğiniz gösterilmektedir. Bu yaklaşımın avantajı, tüm belgenin indirilmesini beklemenizin gerekmemesidir. Bu da büyük dosyalarla çalışırken büyük bir fark yaratabilir.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demografi
Aşağıdaki demoda okunabilir, yazılabilir ve dönüştürülebilir akışlar gösterilmektedir. Ayrıca, pipeThrough()
ve pipeTo()
boru zinciri örneklerini ve tee()
değerlerini de göstermektedir. İsteğe bağlı olarak demoyu kendi penceresinde çalıştırabilir veya kaynak kodu görüntüleyebilirsiniz.
Tarayıcıda faydalı yayınlar var
Tarayıcıda yerleşik olarak bulunan birçok faydalı akış vardır. Bir blob'tan kolayca ReadableStream
oluşturabilirsiniz. Blob
arayüzünün stream() yöntemi, okuma sonrasında blob içinde bulunan verileri döndüren bir ReadableStream
döndürür. Ayrıca File
nesnesinin belirli bir Blob
türü olduğunu ve bir blob'un yapabileceği her bağlamda kullanılabileceğini unutmayın.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
TextDecoder.decode()
ve TextEncoder.encode()
akış varyantları sırasıyla TextDecoderStream
ve TextEncoderStream
olarak adlandırılır.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
CompressionStream
ve DecompressionStream
dönüşüm akışlarıyla dosyaları sırayla sıkıştırmak veya açmak artık çok kolay. Aşağıdaki kod örneğinde, Akışlar spesifikasyonunu nasıl indirebileceğiniz, tarayıcı içinde nasıl sıkıştırabileceğiniz (gzip) ve sıkıştırılmış dosyayı doğrudan diske nasıl yazabileceğiniz gösterilmektedir.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
File System Access API'nin FileSystemWritableFileStream
ve deneysel fetch()
istek akışları, vahşi ortamdaki yazılabilir akışlara örnektir.
Serial API, hem okunabilir hem de yazılabilir akışları yoğun şekilde kullanır.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Son olarak WebSocketStream
API, akışları WebSocket API ile entegre eder.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Faydalı kaynaklar
- Akış spesifikasyonu
- Birlikte verilen demolar
- Akışlar çoklu dolgusu
- 2016 - web akışlarının yılı
- Eş zamansız yineleyiciler ve oluşturucular
- Canlı Yayın Görselleştirici
Teşekkür
Bu makale Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley ve Adam Rice tarafından incelenmiştir. Jake Archibald'ın blog yayınları akışları anlamama çok yardımcı oldu. Kod örneklerinden bazıları, GitHub kullanıcısının @bellbind adlı kullanıcının keşiflerinden esinlenilerek yazı tipinin ağırlıklı olarak Akışlar üzerindeki MDN Web Dokümanları'na dayanan parçalarıdır. Streams Standard'ın yazarları bu spesifikasyonu yazma konusunda büyük bir emek harcadı. Ryan Lara'nın Unsplash'teki hero resmi.