Streams API ile okunabilir, yazılabilir ve dönüştürülebilir akışları nasıl kullanacağınızı öğrenin.
Streams API, ağ üzerinden alınan veya yerel olarak herhangi bir şekilde oluşturulan veri akışlarına programatik olarak erişmenize ve bunları JavaScript ile işlemenize olanak tanır. Akış, almak, göndermek veya dönüştürmek istediğiniz bir kaynağı küçük parçalara ayırmayı ve ardından bu parçaları bit bit işlemeyi içerir. Akış, web sayfalarında gösterilecek HTML veya video gibi öğeleri alan tarayıcıların zaten yaptığı bir işlem olsa da 2015'te akışlarla fetch
kullanıma sunulmadan önce bu özellik JavaScript'de kullanılamamıştı.
Daha önce, bir tür kaynağı (ör. video veya metin dosyası) işlemek istiyorsanız dosyanın tamamını indirmeniz, uygun bir biçime serileştirilmesini beklemeniz ve ardından dosyayı işlemeniz gerekiyordu. JavaScript'in akışları kullanabilmesiyle tüm bunlar değişiyor. Artık istemcide kullanılabilir hale gelir gelmez ham verileri JavaScript ile kademeli olarak işleyebilir, arabelleğe, dizeye veya blob'a gerek kalmaz. Bu, bazılarını aşağıda listelediğim çeşitli kullanım alanlarına olanak tanır:
- Video efektleri: Okunabilir bir video akışını, efektleri gerçek zamanlı olarak uygulayan bir dönüştürme akışı üzerinden aktarma.
- Veri sıkıştırma/sıkıştırma açma: Bir dosya akışının, seçici bir şekilde sıkıştırıldığı/sıkıştırıldığı bir dönüştürme akışı üzerinden aktarılması.
- Resim kodunun çözülmesi: Bir HTTP yanıtı akışının, baytların kodunun bitmap verilerine dönüştürüldüğü bir dönüştürme akışı ve ardından bitmap'leri PNG'ye çeviren başka bir dönüştürme akışı üzerinden aktarılması. Bir hizmet çalışanının
fetch
işleyicisine yüklenirse AVIF gibi yeni resim biçimlerini şeffaf bir şekilde doldurabilirsiniz.
Tarayıcı desteği
ReadableStream ve WritableStream
TransformStream
Temel kavramlar
Çeşitli yayın türleriyle ilgili ayrıntılara geçmeden önce bazı temel kavramları açıklamak isterim.
Büyük Parça
Bir parça, bir akışa yazılan veya bir akıştan okunan tek bir veri parçasıdır. Herhangi bir türde olabilir; hatta farklı türde parçalar içerebilir. Çoğu zaman, bir veri kümesi belirli bir akış için en atomik veri birimi olmaz. Örneğin, bir bayt akışı tek baytlar yerine 16 KiB Uint8Array
biriminden oluşan parçalar içerebilir.
Okunabilir akışlar
Okunabilir akış, okuyabileceğiniz bir veri kaynağını temsil eder. Diğer bir deyişle, veriler okunabilir bir akıştan çıkar. Daha açık belirtmek gerekirse, okunabilir akış, ReadableStream
sınıfının bir örneğidir.
Yazılabilir akışlar
Yazılabilir akış, veri yazabileceğimiz bir hedefi temsil eder. Başka bir deyişle, veriler yazılabilir bir akışa girer. Yazılabilir akış, WritableStream
sınıfının bir örneğidir.
Akışları dönüştürme
Dönüşüm akışı, bir çift akıştan oluşur: Yazılabilir tarafı olarak bilinen yazılabilir bir akış ve okunabilir tarafı olarak bilinen okunabilir bir akış.
Bu durumu gerçek dünyada bir metaforla açıklamak gerekirse, bir dilden diğerine anında çeviri yapan simultane çevirmen olarak örnek verilebilir.
Dönüşüm akışına özgü bir şekilde, yazılabilir tarafa yazma işlemi, okunabilir taraftan okunmaya hazır yeni verilerin sunulmasına neden olur. Daha açık belirtmek gerekirse, writable
ve readable
özelliğine sahip tüm nesneler dönüşüm akışı olarak kullanılabilir. Bununla birlikte, standart TransformStream
sınıfı, düzgün şekilde dolanmış böyle bir çift oluşturmayı kolaylaştırır.
Boru zincirleri
Akışlar, öncelikle birbirlerine bağlantı verilerek kullanılır. Okunabilir bir akış, okunabilir akışın pipeTo()
yöntemi kullanılarak doğrudan yazılabilir bir akışa bağlanabilir veya okunabilir akışın pipeThrough()
yöntemi kullanılarak önce bir veya daha fazla dönüşüm akışından geçirilebilir. Bu şekilde birleştirilmiş bir akış grubuna boru zinciri denir.
Geri basınç
Bir boru zinciri oluşturulduktan sonra, parçaların içinden ne kadar hızlı akması gerektiğine dair sinyaller yayılır. Zincirdeki herhangi bir adım henüz parçaları kabul edemiyorsa boru zinciri boyunca geriye doğru bir sinyal yayılır. Bu sinyal, sonunda orijinal kaynağa bu kadar hızlı parça üretmeyi bırakması söylenene kadar devam eder. Bu akışın normalleştirilmesi sürecine karşı basınç adı verilir.
Tişört
Okunabilir bir akış, tee()
yöntemi kullanılarak yan dallara ayrılabilir (büyük harfli "T" şeklinden dolayı bu şekilde adlandırılır).
Bu işlem, yayını kilitler, yani artık doğrudan kullanılamaz hale getirir. Ancak bağımsız olarak kullanılabilen iki yeni yayın (dal olarak adlandırılır) oluşturur.
Akışlar geri sarılamadığı veya yeniden başlatılamadığı için başlangıç noktası da önemlidir. Bu konu hakkında daha fazla bilgiyi aşağıda bulabilirsiniz.
Okunabilir bir yayının işleyiş şekli
Okunabilir akış, JavaScript'te temel bir kaynaktan akan bir ReadableStream
nesnesi tarafından temsil edilen bir veri kaynağıdır. ReadableStream()
sınıfının kurucusu, belirtilen işleyicilerden okunabilir bir akış nesnesi oluşturur ve döndürür. İki tür temel kaynak vardır:
- Aktarıcı kaynaklar, eriştiğinizde size sürekli olarak veri aktarır. Akışa erişimi başlatmak, duraklatmak veya iptal etmek ise size bağlıdır. Örnekler arasında canlı video akışları, sunucu tarafından gönderilen etkinlikler veya WebSocket'ler yer alır.
- Alma kaynakları, bağlandıktan sonra bu kaynaklardan açıkça veri istemenizi gerektirir. Buna örnek olarak
fetch()
veyaXMLHttpRequest
çağrıları aracılığıyla yapılan HTTP işlemleri verilebilir.
Akış verileri, parça adı verilen küçük parçalar halinde sırayla okunur. Bir akışa yerleştirilen parçaların sıraya eklenmiş olduğu söylenir. Bu, okunmaya hazır olarak bir sırada bekledikleri anlamına gelir. Dahili sıra, henüz okunmamış parçaları takip eder.
Sıraya ekleme stratejisi, bir akışın dahili sıranın durumuna göre karşı basıncı nasıl işaret edeceğini belirleyen bir nesnedir. Sıralama stratejisi her bir parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu maksimum değer olarak bilinen belirli bir sayıyla karşılaştırır.
Akıştaki parçalar bir okuyucu tarafından okunur. Bu okuyucu, verileri birer parça halinde alır ve üzerinde istediğiniz türde işlem yapmanıza olanak tanır. Okuyucu ve onunla birlikte gelen diğer işleme kodu tüketici olarak adlandırılır.
Bu bağlamda bir sonraki yapıya denetleyici denir. Her okunabilir akış, adından da anlaşılacağı gibi akışı kontrol etmenizi sağlayan ilişkili bir denetleyiciye sahiptir.
Bir akışı aynı anda yalnızca bir okuyucu okuyabilir. Bir okuyucu, oluşturulup okumaya başladığında (yani etkin bir okuyucu haline geldiğinde) akışa kilitlenir. Aktardığınız metni başka bir okuyucunun okumasını istiyorsanız genellikle başka bir işlem yapmadan önce ilk okuyucuyu bırakmanız gerekir (ancak aktarmaları bölebilirsiniz).
Okunabilir bir akış oluşturma
Oluşturucusunu çağırarak okunabilir bir akış oluşturursunuzReadableStream()
.
Oluşturucu, oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntem ve özelliklere sahip bir nesneyi temsil eden isteğe bağlı bir bağımsız değişkene underlyingSource
sahiptir.
underlyingSource
Bu işlem için geliştirici tarafından tanımlanan aşağıdaki isteğe bağlı yöntemler kullanılabilir:
start(controller)
: Nesne oluşturulduğunda hemen çağrılır. Yöntem, akış kaynağına erişebilir ve akış işlevini ayarlamak için gereken diğer her şeyi yapabilir. Bu işlem eşzamansız olarak yapılacaksa yöntem, başarı veya başarısızlığı bildirmek için bir promise döndürebilir. Bu yönteme iletilencontroller
parametresi birReadableStreamDefaultController
bağımsız değişkenidir.pull(controller)
: Daha fazla parça getirilirken yayını kontrol etmek için kullanılabilir. Akıştaki dahili parça kuyruğu dolu olmadığı sürece ve kuyruk en yüksek işaretine ulaşana kadar tekrar tekrar çağrılır.pull()
çağrılmasının sonucu bir sözse söz yerine getirilene kadarpull()
tekrar çağrılmaz. Sözleşme reddedilirse akışta hata oluşur.cancel(reason)
: Yayın tüketicisi, yayını iptal ettiğinde çağrılır.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
aşağıdaki yöntemleri destekler:
ReadableStreamDefaultController.close()
, ilişkili yayını kapatır.ReadableStreamDefaultController.enqueue()
, belirli bir parçayı ilişkili akışa ekler.ReadableStreamDefaultController.error()
, ilişkili akışla gelecekteki tüm etkileşimlerin hatayla sonuçlanmasına neden olur.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Benzer şekilde, ReadableStream()
kurucusunun ikinci bağımsız değişkeni queuingStrategy
şeklindedir.
Bu, isteğe bağlı olarak akış için iki parametre alan bir sıraya ekleme stratejisini tanımlayan bir nesnedir:
highWaterMark
: Bu sıraya ekleme stratejisinin kullanıldığı yayının en yüksek noktasını gösteren sıfırdan büyük bir sayı.size(chunk)
: Belirtilen parça değerinin sonlu ve negatif olmayan boyutunu hesaplayıp döndüren bir işlev. Sonuç, geri basıncı belirlemek için kullanılır ve uygunReadableStreamDefaultController.desiredSize
mülkü aracılığıyla gösterilir. Ayrıca, temel kaynağınpull()
yönteminin ne zaman çağrılacağını da belirler.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
getReader()
ve read()
yöntemleri
Okunabilir bir akıştan okumak için bir okuyucuya ihtiyacınız vardır. Bu okuyucu, ReadableStreamDefaultReader
olacaktır.
ReadableStream
arayüzünün getReader()
yöntemi bir okuyucu oluşturur ve akışı bu okuyucuya kilitler. Akış kilitliyken bu okuyucu serbest bırakılana kadar başka okuyucu edinilemez.
ReadableStreamDefaultReader
arayüzünün read()
yöntemi, akışın dahili sırasındaki bir sonraki parçaya erişim sağlayan bir taahhüt döndürür. Akış durumuna bağlı olarak bir sonuçla isteği yerine getirir veya reddeder. Olasılıklar şunlardır:
- Bir parça mevcutsa söz,
{ value: chunk, done: false }
biçiminde bir nesneyle yerine getirilir. - Akış kapatılırsa söz,
{ value: undefined, done: true }
biçiminde bir nesneyle yerine getirilir. - Akışta hata oluşursa söz konusu hata ile birlikte söz reddedilir.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
locked
mülkü
Okunabilir bir yayının kilitli olup olmadığını kontrol etmek için ReadableStream.locked
özelliğine erişebilirsiniz.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Okunabilir akış kod örnekleri
Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir. Önce, underlyingSource
bağımsız değişkeninde (yani TimestampSource
sınıfı) bir start()
yöntemi tanımlayan bir ReadableStream
oluşturursunuz.
Bu yöntem, yayının controller
'e on saniye boyunca her saniye bir zaman damgası enqueue()
göndermesini söyler.
Son olarak da akış için close()
komutunu gönderir. Bu akışı, getReader()
yöntemiyle bir okuyucu oluşturup akış done
olana kadar read()
çağrısı yaparak tüketirsiniz.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Eşzamansız iterasyon
Her read()
döngü iterasyonunda akış done
olup olmadığını kontrol etmek en uygun API olmayabilir.
Neyse ki yakında bunu yapmanın daha iyi bir yolu olacak: eşzamansız iterasyon.
for await (const chunk of stream) {
console.log(chunk);
}
Eş zamansız iterasyonu bugün kullanmak için bir geçici çözüm, davranışı bir polyfill ile uygulamaktır.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Okunabilir bir akış oluşturma
ReadableStream
arayüzünün tee()
yöntemi, mevcut okunabilir akışı ayırarak yeni ReadableStream
örnekleri olarak iki dal içeren iki öğeli bir dizi döndürür. Bu sayede iki okuyucu aynı anda bir yayını okuyabilir. Örneğin, sunucudan bir yanıt almak ve bunu tarayıcıya aktarmak, aynı zamanda hizmet çalışanı önbelleğine aktarmak istiyorsanız bunu bir hizmet çalışanında yapabilirsiniz. Yanıt gövdesi birden fazla kez kullanılamayacağından bunu yapmak için iki kopyaya ihtiyacınız vardır. Ardından, akışı iptal etmek için ortaya çıkan her iki dalı da iptal etmeniz gerekir. Bir yayını başlattığınızda yayın genellikle kilitlenir ve diğer okuyucular tarafından kilitlenmesi engellenir.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Okunabilir bayt akışları
Baytları temsil eden akışlar için, baytları verimli bir şekilde işlemek amacıyla (özellikle kopyaları en aza indirerek) okunabilir akışın genişletilmiş bir sürümü sağlanır. Bayt akışları, kendi arabelleğinizi getirme (BYOB) okuyucularının edinilmesine olanak tanır. Varsayılan uygulama, WebSocket'ler söz konusu olduğunda dize veya dizi arabellekleri gibi çeşitli farklı çıkışlar verebilir. Byte akışları ise bayt çıkışını garanti eder. Ayrıca, KOBİ okuyucularının kararlılıkla ilgili avantajları vardır. Bunun nedeni, bir arabelleğin ayrılması durumunda aynı arabelleğe iki kez yazılmayacağının garanti edilebilmesi ve böylece yarış koşullarının önlenebilmesidir. BYOB okuyucuları, tarayıcı tamponları yeniden kullanabileceği için atık toplama çalıştırması gereken süreyi azaltabilir.
Okunabilir bir bayt akışı oluşturma
ReadableStream()
oluşturucusuna ek bir type
parametresi ileterek okunabilir bir bayt akışı oluşturabilirsiniz.
new ReadableStream({ type: 'bytes' });
underlyingSource
Okunabilir bir bayt akışının temel kaynağına, üzerinde işlem yapmak için bir ReadableByteStreamController
verilir. Bu ReadableByteStreamController.enqueue()
yöntemi, değeri ArrayBufferView
olan bir chunk
bağımsız değişkeni alır. ReadableByteStreamController.byobRequest
özelliği, geçerli BYOB pull isteğini döndürür. İstek yoksa boş değer döndürür. Son olarak, ReadableByteStreamController.desiredSize
özelliği, kontrollü akışın dahili sırasını doldurmak için istenen boyutu döndürür.
queuingStrategy
Benzer şekilde, ReadableStream()
kurucusunun ikinci bağımsız değişkeni queuingStrategy
şeklindedir.
İsteğe bağlı olarak akış için bir sıra stratejisi tanımlayan ve bir parametre alan bir nesnedir:
highWaterMark
: Bu sıraya ekleme stratejisini kullanan yayının en yüksek noktasını gösteren sıfırdan büyük bir bayt sayısı. Bu, geri basıncı belirlemek için kullanılır ve uygunReadableByteStreamController.desiredSize
mülkü aracılığıyla gösterilir. Ayrıca, temel kaynağınpull()
yönteminin ne zaman çağrılacağını da belirler.
getReader()
ve read()
yöntemleri
Ardından, mode
parametresini uygun şekilde ayarlayarak ReadableStreamBYOBReader
'e erişebilirsiniz:
ReadableStream.getReader({ mode: "byob" })
. Bu sayede, kopyalardan kaçınmak için arabellek ayırma üzerinde daha hassas kontrol sağlanır. Bayt akışından okumak için ReadableStreamBYOBReader.read(view)
işlevini çağırmanız gerekir. Burada view
bir ArrayBufferView
bağımsız değişkenidir.
Okunabilir bayt akışı kod örneği
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Aşağıdaki işlev, rastgele oluşturulmuş bir dizinin sıfır kopyayla verimli bir şekilde okunmasına olanak tanıyan okunabilir bayt akışları döndürür. Önceden belirlenmiş 1.024 parça boyutunu kullanmak yerine, tam kontrol sağlamak için geliştirici tarafından sağlanan arabelleği doldurmaya çalışır.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Yazılabilir akışların işleyiş şekli
Yazılabilir akış, veri yazabileceğiniz bir hedeftir. JavaScript'te WritableStream
nesnesiyle gösterilir. Bu, alttaki havuzun, yani ham verilerin yazıldığı daha düşük seviyeli bir G/Ç havuzunun üst kısmı üzerinde bir soyutlama görevi görür.
Veriler, tek seferde bir parça olacak şekilde bir yazar aracılığıyla akışa yazılır. Bir parça, tıpkı bir okuyucudaki parçalar gibi birçok şekilde olabilir. Yazmaya hazır parçalar oluşturmak için istediğiniz kodu kullanabilirsiniz. Yazıcı ve ilişkili koda üretici denir.
Bir yazar oluşturulduğunda ve bir akışa yazmaya başladığında (etkin yazar), akışa kilitlendiği söylenir. Yazılabilir bir akışa aynı anda yalnızca bir yazar yazabilir. Başka bir yazarın akışınıza yazmaya başlamasını istiyorsanız genellikle yayını yayınlamanız ve ardından başka bir yazar eklemeniz gerekir.
Dahili bir sıra, akışa yazılmış ancak henüz temel alıcı tarafından işlenmemiş parçaları izler.
Sıraya ekleme stratejisi, bir akışın dahili sıranın durumuna göre karşı basıncı nasıl işaret edeceğini belirleyen bir nesnedir. Sıralama stratejisi her bir parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu maksimum değer olarak bilinen belirli bir sayıyla karşılaştırır.
Nihai yapıya denetleyici adı verilir. Her yazılabilir akışın, akışı kontrol etmenize (örneğin, iptal etmenize) olanak tanıyan ilişkili bir denetleyicisi vardır.
Yazılabilir akış oluşturma
Streams API'nin WritableStream
arayüzü, aktarma noktası olarak bilinen bir hedefe aktarma verileri yazmak için standart bir soyutlama sağlar. Bu nesne, yerleşik karşı basınç ve sıraya ekleme özelliğine sahiptir. Oluşturucusunu WritableStream()
çağırarak yazılabilir bir akış oluşturursunuz.
Oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntem ve özelliklere sahip bir nesneyi temsil eden isteğe bağlı bir underlyingSink
parametresi vardır.
underlyingSink
underlyingSink
, geliştirici tarafından tanımlanan aşağıdaki isteğe bağlı yöntemleri içerebilir. Bazı yöntemlere iletilen controller
parametresi WritableStreamDefaultController
şeklindedir.
start(controller)
: Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Bu yöntemin içeriği, temel lavaboya erişmeyi amaçlamalıdır. Bu işlem eşzamansız olarak yapılacaksa başarı veya başarısızlığı bildiren bir promise döndürülebilir.write(chunk, controller)
: Bu yöntem, yeni bir veri parçası (chunk
parametresinde belirtilir) temel alıcıya yazılmaya hazır olduğunda çağrılır. Yazma işleminin başarılı veya başarısız olduğunu belirtmek için bir promise döndürebilir. Bu yöntem yalnızca önceki yazma işlemleri başarılı olduktan sonra çağrılır ve hiçbir zaman akış kapatıldıktan veya iptal edildikten sonra çağrılmaz.close(controller)
: Uygulama, akışa parçalar yazmayı tamamladığını bildiriyorsa bu yöntem çağrılır. İçerikler, temeldeki havuza yazma işlemlerini tamamlamak ve bu havuza erişimi serbest bırakmak için gerekeni yapmalıdır. Bu süreç eşzamansızsa başarıya veya başarısızlığa işaret eden bir vaat döndürebilir. Bu yöntem yalnızca sıraya alınmış tüm yazma işlemleri başarılı olduktan sonra çağrılır.abort(reason)
: Bu yöntem, uygulama akışı aniden kapatmak ve işlemi hatalı duruma getirmek istediğini bildiriyorsa çağrılır.close()
gibi bekletilen kaynakları temizleyebilir ancak yazma işlemleri sıraya alınmış olsa bileabort()
çağrılır. Bu parçalar çöpe atılır. Bu işlem eşzamanlı değilse başarı veya başarısızlık sinyali vermek için bir promise döndürebilir.reason
parametresi, aktarımın neden iptal edildiğini açıklayan birDOMString
içerir.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Streams API'nin WritableStreamDefaultController
arayüzü, yazma işlemi için daha fazla parça gönderilirken veya yazma işleminin sonunda, kurulum sırasında WritableStream
durumunun kontrol edilmesine olanak tanıyan bir denetleyiciyi temsil eder. WritableStream
oluşturulurken temel havuza, manipüle edilmesi için karşılık gelen bir WritableStreamDefaultController
örneği verilir. WritableStreamDefaultController
için tek bir yöntem vardır: WritableStreamDefaultController.error()
bu yöntem, ilişkilendirilmiş akışla gelecekteki etkileşimlerde hataya neden olur.
WritableStreamDefaultController
, AbortSignal
örneği döndüren bir signal
özelliğini de destekler. Bu özellik, gerektiğinde WritableStream
işleminin durdurulmasına olanak tanır.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
WritableStream()
kurucusunun ikinci, yine isteğe bağlı bağımsız değişkeni queuingStrategy
'dur.
İsteğe bağlı olarak akış için bir sıra stratejisi tanımlayan bir nesnedir ve iki parametre alır:
highWaterMark
: Bu sıraya ekleme stratejisinin kullanıldığı yayının en yüksek noktasını gösteren sıfırdan büyük bir sayı.size(chunk)
: Belirtilen parça değerinin sonlu ve negatif olmayan boyutunu hesaplayıp döndüren bir işlev. Sonuç, geri basıncı belirlemek için kullanılır ve uygunWritableStreamDefaultWriter.desiredSize
mülkü aracılığıyla gösterilir.
getWriter()
ve write()
yöntemleri
Yazılabilir bir akışa yazmak için yazar gerekir. Bu da WritableStreamDefaultWriter
olacaktır. WritableStream
arayüzünün getWriter()
yöntemi, yeni bir WritableStreamDefaultWriter
örneği döndürür ve akışı bu örneğe kilitler. Akış kilitliyken mevcut yazar bırakılana kadar başka yazar edinilemez.
WritableStreamDefaultWriter
arayüzünün write()
yöntemi, geçirilen bir veri parçasını WritableStream
ile bunun temel havuzuna yazar, ardından yazma işleminin başarılı veya başarısız olduğunu belirten bir çözüm döndürür. "Başarı"nın ne anlama geldiğine bağlı olduğunu unutmayın. Bu, parçanın kabul edildiğini gösterebilir ancak nihai hedefine güvenli bir şekilde kaydedildiğini göstermeyebilir.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
locked
mülkü
Yazılabilir bir aktarımın kilitli olup olmadığını kontrol etmek için WritableStream.locked
mülküne erişebilirsiniz.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Yazılabilir akış kod örneği
Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Okunabilir bir akışı, yazılabilir bir akışa aktarma
Okunabilir bir akış, okunabilir akışın pipeTo()
yöntemi aracılığıyla yazılabilir bir akışa aktarılabilir.
ReadableStream.pipeTo()
, mevcut ReadableStream
öğesini belirli bir WritableStream
öğesine aktarır ve aktarma işlemi başarıyla tamamlandığında yerine getirilen veya herhangi bir hatayla karşılaşıldığında reddedilen bir promise döndürür.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Dönüşüm akışı oluşturma
Streams API'nin TransformStream
arayüzü, dönüştürülebilir bir veri grubunu temsil eder. Oluşturucu TransformStream()
'yi çağırarak bir dönüştürme akışı oluşturursunuz. Bu oluşturucu, belirtilen işleyicilerden bir dönüştürme akışı nesnesi oluşturup döndürür. TransformStream()
kurucusu, ilk bağımsız değişkeni olarak transformer
'yi temsil eden isteğe bağlı bir JavaScript nesnesi kabul eder. Bu tür nesneler, aşağıdaki yöntemlerin herhangi birini içerebilir:
transformer
start(controller)
: Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Genelliklecontroller.enqueue()
kullanılarak ön ek parçalarını sıraya eklemek için kullanılır. Bu parçalar okunabilir taraftan okunur ancak yazılabilir tarafa yapılan yazma işlemlerine bağlı değildir. Bu ilk işlem, örneğin ön ek parçalarını elde etmek biraz çaba gerektirdiği için eşzamanlı değilse işlev, başarı veya başarısızlığı bildirmek için bir söz döndürebilir. Reddedilen bir söz, aktarımda hata oluşturur. Atılan tüm istisnalar,TransformStream()
oluşturucusu tarafından yeniden atanır.transform(chunk, controller)
: Bu yöntem, orijinal olarak yazılabilir tarafa yazılan yeni bir parça dönüştürülmeye hazır olduğunda çağrılır. Akış uygulaması, bu işlevin yalnızca önceki dönüştürme işlemleri başarılı olduktan sonra çağrılacağını ve hiçbir zamanstart()
tamamlanmadan önce veyaflush()
çağrıldıktan sonra çağrılmayacağını garanti eder. Bu işlev, dönüştürme akışının gerçek dönüştürme işlemini gerçekleştirir. Sonuçlarıcontroller.enqueue()
kullanarak sıraya ekleyebilir. Bu, yazılabilir tarafa yazılan tek bir parçanın,controller.enqueue()
'ün kaç kez çağrıldığına bağlı olarak okunabilir tarafta sıfır veya birden fazla parçayla sonuçlanmasını sağlar. Dönüşüm işlemi eşzamanlı değilse bu işlev, dönüşümün başarılı veya başarısız olduğunu bildiren bir promise döndürebilir. Reddedilen bir söz, dönüştürme akışının hem okunabilir hem de yazılabilir tarafında hata verir. Hiçbirtransform()
yöntemi sağlanmazsa kimlik dönüştürme kullanılır. Bu yöntem, yazılabilir taraftan okunabilir tarafa değişmeden parçaları ekler.flush(controller)
: Bu yöntem, yazılabilir tarafa yazılan tüm parçalartransform()
üzerinden başarıyla geçirilerek dönüştürüldükten ve yazılabilir taraf kapatılmak üzereyken çağrılır. Bu genellikle, son ek parçalarını okunabilir tarafa eklemek için kullanılır. Boşaltma işlemi eşzamanlı değilse işlev, başarı veya başarısızlığı belirtmek için bir promise döndürebilir. Sonuç,stream.writable.write()
çağrısını yapan kullanıcıya iletilir. Ayrıca, reddedilen bir promise, aktarımın hem okunabilir hem de yazılabilir tarafında hata oluşturur. İstisna atma işlemi, reddedilen bir promise döndürmekle aynı şekilde değerlendirilir.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
writableStrategy
ve readableStrategy
sıra stratejileri
TransformStream()
oluşturucunun isteğe bağlı ikinci ve üçüncü parametreleri, isteğe bağlı writableStrategy
ve readableStrategy
sıraya alma stratejileridir. Bunlar sırasıyla readable ve writable akış bölümlerinde belirtildiği şekilde tanımlanır.
Dönüşüm akışı kod örneği
Aşağıdaki kod örneğinde, basit bir dönüştürme akışının işleyişi gösterilmektedir.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Okunabilir bir akışı dönüşüm akışı üzerinden ardışık olarak oluşturma
ReadableStream
arayüzünün pipeThrough()
yöntemi, mevcut akışı bir dönüştürme akışı veya başka bir yazılabilir/okunabilir çift üzerinden aktarmanın zincirlenebilir bir yolunu sağlar. Bir akışı boruya aktarmak genellikle akışı boru boyunca kilitler ve diğer okuyucuların kilitlemesini engeller.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Bir sonraki kod örneğinde (biraz abartılı) döndürülen yanıt vaadini bir akış olarak ve büyük/küçük harf ile parçayı kullanarak tüm metni büyük hale getiren fetch()
ürününün "sesli" sürümünü nasıl uygulayabileceğinizi görebilirsiniz. Bu yaklaşımın avantajı, dokümanın tamamının indirilmesini beklemeniz gerekmemesidir. Bu, büyük dosyalarla çalışırken büyük bir fark yaratabilir.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
Aşağıdaki demoda, okunabilir, yazılabilir ve dönüştürme akışları gösterilmektedir. Ayrıca pipeThrough()
ve pipeTo()
boru zincirlerine dair örnekler içerir ve tee()
'yi gösterir. İsterseniz demoyu kendi penceresinde çalıştırabilir veya kaynak kodunu görüntüleyebilirsiniz.
Tarayıcıda kullanılabilen faydalı akışlar
Doğrudan tarayıcıda yerleşik olarak bulunan çok sayıda kullanışlı akış vardır. Bir blob'dan kolayca ReadableStream
oluşturabilirsiniz. Blob
arayüzünün stream() yöntemi, okuma sonrasında blob içinde yer alan verileri döndüren bir ReadableStream
döndürür. Bir File
nesnesinin belirli bir Blob
türü olduğunu ve bir blobun yapabileceği herhangi bir bağlamda kullanılabileceğini de unutmayın.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
TextDecoder.decode()
ve TextEncoder.encode()
'un yayın varyantları sırasıyla TextDecoderStream
ve TextEncoderStream
olarak adlandırılır.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Sırasıyla CompressionStream
ve DecompressionStream
dönüştürme akışlarıyla dosyaları sıkıştırmak veya sıkıştırılmış dosyaları açmak kolaydır. Aşağıdaki kod örneğinde, Streams spesifikasyonunu nasıl indireceğiniz, doğrudan tarayıcıda nasıl sıkıştıracağınız (gzip) ve sıkıştırılmış dosyayı doğrudan diske nasıl yazacağınız gösterilmektedir.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
File System Access API'nin FileSystemWritableFileStream
ve deneysel fetch()
istek akışları, vahşi yaşamdaki yazılabilir akışlara örnektir.
Serial API, hem okunabilir hem de yazılabilir akışları yoğun bir şekilde kullanır.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Son olarak WebSocketStream
API, akışları WebSocket API ile entegre eder.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Faydalı kaynaklar
- Akışlar spesifikasyonu
- Eşlik eden demolar
- Akış polyfill'i
- 2016: web akışlarının yılı
- Eşzamansız iteratör ve üreteçler
- Akış Görselleştirici
Teşekkür ederiz
Bu makale, Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley ve Adam Rice tarafından incelendi. Jake Archibald'ın blog yayınları, yayınları anlamama çok yardımcı oldu. Kod örneklerinin bazıları GitHub kullanıcısı @bellbind'in keşiflerinden esinlenmiştir ve metnin bazı bölümleri Akışlar ile ilgili MDN Web Dokümanları'na dayanır. Akışlar Standardı'nın yazarları bu spesifikasyonu yazarken çok iyi bir iş çıkarmış. Unsplash'tan Ryan Lara tarafından oluşturulan lokomotif resim.