Streams API ile okunabilir, yazılabilir ve dönüştürülebilir akışları nasıl kullanacağınızı öğrenin.
Streams API, ağ üzerinden alınan veya yerel olarak herhangi bir şekilde oluşturulan veri akışlarına programatik olarak erişmenize ve bunları JavaScript ile işlemenize olanak tanır. Akış, almak, göndermek veya dönüştürmek istediğiniz bir kaynağı küçük parçalara ayırmayı ve ardından bu parçaları bit bit işlemeyi içerir. Akış, web sayfalarında gösterilecek HTML veya video gibi öğeleri alırken tarayıcıların zaten yaptığı bir işlemdir. Ancak 2015'te akışlarla fetch
kullanıma sunulmadan önce bu özellik JavaScript'te kullanılamamıştı.
Daha önce, bir tür kaynağı (ör. video veya metin dosyası) işlemek isterseniz dosyanın tamamını indirmeniz, uygun bir biçime serileştirilmesini beklemeniz ve ardından dosyayı işlemeniz gerekiyordu. JavaScript'in akışları kullanabilmesiyle tüm bunlar değişiyor. Artık istemcide kullanılabilir hale gelir gelmez ham verileri JavaScript ile kademeli olarak işleyebilir, arabelleğe, dizeye veya blob'a gerek kalmaz. Bu, bazılarını aşağıda listelediğim çeşitli kullanım alanlarına olanak tanır:
- Video efektleri: Okunabilir bir video akışını, efektleri gerçek zamanlı olarak uygulayan bir dönüştürme akışı üzerinden aktarma.
- Veri sıkıştırma/sıkıştırma açma: Bir dosya akışının, seçici bir şekilde sıkıştırma/sıkıştırma açma işlemi yapan bir dönüştürme akışı üzerinden aktarılması.
- Resim kodunun çözülmesi: Bir HTTP yanıtı akışının, baytların kodunun bitmap verilerine dönüştürüldüğü bir dönüştürme akışı ve ardından bitmap'lerin PNG'ye dönüştürüldüğü başka bir dönüştürme akışı üzerinden aktarılması. Bir hizmet çalışanının
fetch
işleyicisine yüklenirse AVIF gibi yeni resim biçimlerini şeffaf bir şekilde doldurabilirsiniz.
Tarayıcı desteği
ReadableStream ve WritableStream
TransformStream
Temel kavramlar
Çeşitli yayın türleriyle ilgili ayrıntılara geçmeden önce bazı temel kavramları açıklamak isterim.
Büyük Parça
Bir parça, bir akışa yazılan veya bir akıştan okunan tek bir veri parçasıdır. Herhangi bir türde olabilir; hatta farklı türde parçalar içerebilir. Çoğu zaman, bir veri parçası belirli bir akış için en atomik veri birimi olmaz. Örneğin, bir bayt akışı tek baytlar yerine 16 KiB Uint8Array
biriminden oluşan parçalar içerebilir.
Okunabilir akışlar
Okunabilir akış, okuyabileceğiniz bir veri kaynağını temsil eder. Diğer bir deyişle, veriler okunabilir bir akıştan çıkar. Daha açık belirtmek gerekirse, okunabilir akış, ReadableStream
sınıfının bir örneğidir.
Yazılabilir akışlar
Yazılabilir akış, veri yazabileceğimiz bir hedefi temsil eder. Diğer bir deyişle, veriler yazılabilir bir akışa girer. Yazılabilir akış, WritableStream
sınıfının bir örneğidir.
Akışları dönüştürme
Dönüşüm akışı, bir çift akıştan oluşur: Yazılabilir tarafı olarak bilinen yazılabilir bir akış ve okunabilir tarafı olarak bilinen okunabilir bir akış.
Bu durumu gerçek dünyada bir metaforla açıklamak gerekirse, bir dilden diğerine anında çeviri yapan simultane çevirmen olarak örnek verilebilir.
Dönüşüm akışına özgü bir şekilde, yazılabilir tarafa yazma işlemi, okunabilir taraftan okunmaya hazır yeni verilerin sunulmasına neden olur. Daha açık belirtmek gerekirse, writable
ve readable
özelliğine sahip tüm nesneler dönüşüm akışı olarak kullanılabilir. Ancak standart TransformStream
sınıfı, düzgün bir şekilde dolaşık olan böyle bir çift oluşturmayı kolaylaştırır.
Boru zincirleri
Akışlar, öncelikle birbirlerine bağlantı verilerek kullanılır. Okunabilir bir akış, okunabilir akışın pipeTo()
yöntemi kullanılarak doğrudan yazılabilir bir akışa aktarılabilir veya okunabilir akışın pipeThrough()
yöntemi kullanılarak önce bir veya daha fazla dönüştürme akışı üzerinden aktarılabilir. Bu şekilde birleştirilmiş bir akış grubuna boru zinciri denir.
Geri basınç
Bir boru zinciri oluşturulduktan sonra, parçaların içinden ne kadar hızlı akması gerektiğine dair sinyaller yayılır. Zincirdeki herhangi bir adım henüz parçaları kabul edemiyorsa boru zinciri boyunca geriye doğru bir sinyal yayılır. Bu sinyal, sonunda orijinal kaynağa bu kadar hızlı parça üretmeyi bırakması söylenene kadar devam eder. Bu akış normalleştirme işlemine geri basınç denir.
Teeing
Okunabilir bir akış, tee()
yöntemi kullanılarak yan dallara ayrılabilir (büyük harfli "T" şeklinden dolayı bu şekilde adlandırılır).
Bu işlem, yayını kilitler, yani artık doğrudan kullanılamaz hale getirir. Ancak bağımsız olarak kullanılabilen iki yeni yayın (dal olarak adlandırılır) oluşturur.
Akışlar geri sarılamadığı veya yeniden başlatılamadığı için başlangıç noktası da önemlidir. Bu konu hakkında daha fazla bilgiyi aşağıda bulabilirsiniz.
Okunabilir bir akış mekanizması
Okunabilir akış, JavaScript'te temel bir kaynaktan akan bir ReadableStream
nesnesi tarafından temsil edilen bir veri kaynağıdır. ReadableStream()
sınıfının kurucusu, belirtilen işleyicilerden okunabilir bir akış nesnesi oluşturur ve döndürür. İki tür temel kaynak vardır:
- Aktarıcı kaynaklar, eriştiğinizde size sürekli olarak veri aktarır. Akışa erişimi başlatmak, duraklatmak veya iptal etmek size bağlıdır. Canlı video yayınları, sunucu tarafından gönderilen etkinlikler veya WebSocket'ler buna örnek gösterilebilir.
- Alma kaynakları, bağlandıktan sonra bu kaynaklardan açıkça veri istemenizi gerektirir. Örnekler arasında
fetch()
veyaXMLHttpRequest
çağrıları aracılığıyla yapılan HTTP işlemleri yer alır.
Akış verileri, parça adı verilen küçük parçalar halinde sırayla okunur. Bir akışa yerleştirilen parçaların sıraya eklenmiş olduğu söylenir. Bu, okunmaya hazır olarak bir sırada bekledikleri anlamına gelir. Henüz okunmamış parçalar dahili bir kuyrukta tutulur.
Sıralama stratejisi, bir aktarımın dahili kuyruğunun durumuna göre geri basıncı nasıl bildirmesi gerektiğini belirleyen bir nesnedir. Sıralama stratejisi her bir parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu maksimum değer olarak bilinen belirli bir sayıyla karşılaştırır.
Akıştaki parçalar bir okuyucu tarafından okunur. Bu okuyucu, verileri birer parça halinde alır ve üzerinde istediğiniz türde işlem yapmanıza olanak tanır. Okuyucu ve onunla birlikte gelen diğer işleme koduna tüketici denir.
Bu bağlamda bir sonraki yapıya denetleyici denir. Okunabilir her akış, adından da anlaşılacağı gibi akışı kontrol etmenize olanak tanıyan ilişkili bir kontrolöre sahiptir.
Bir akış aynı anda yalnızca bir okuyucu tarafından okunabilir. Bir okuyucu oluşturulduğunda ve bir akışı okumaya başladığında (yani etkin okuyucu olduğunda) akışa kilitlenir. Aktardığınız metni başka bir okuyucunun okumasını istiyorsanız genellikle başka bir işlem yapmadan önce ilk okuyucuyu bırakmanız gerekir (ancak aktarmaları bölebilirsiniz).
Okunabilir bir akış oluşturma
Oluşturucusunu çağırarak okunabilir bir akış oluşturursunuzReadableStream()
.
Oluşturucu, oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntem ve özelliklere sahip bir nesneyi temsil eden isteğe bağlı bir bağımsız değişkene underlyingSource
sahiptir.
underlyingSource
Bu işlem için geliştirici tarafından tanımlanan aşağıdaki isteğe bağlı yöntemler kullanılabilir:
start(controller)
: Nesne oluşturulduğunda hemen çağrılır. Yöntem, akış kaynağına erişebilir ve akış işlevini ayarlamak için gereken her şeyi yapabilir. Bu işlem eşzamansız olarak yapılacaksa yöntem, başarı veya başarısızlığı bildirmek için bir promise döndürebilir. Bu yönteme iletilencontroller
parametresi birReadableStreamDefaultController
bağımsız değişkenidir.pull(controller)
: Daha fazla parça getirilirken yayını kontrol etmek için kullanılabilir. Akıştaki dahili parça kuyruğu dolu olmadığı sürece ve kuyruk en yüksek işaretine ulaşana kadar tekrar tekrar çağrılır.pull()
çağrılmasının sonucu bir sözse söz yerine getirilene kadarpull()
tekrar çağrılmaz. Sözleşme reddedilirse akışta hata oluşur.cancel(reason)
: Akış tüketicisi akışı iptal ettiğinde çağrılır.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
aşağıdaki yöntemleri destekler:
ReadableStreamDefaultController.close()
, ilişkili yayını kapatır.ReadableStreamDefaultController.enqueue()
, belirli bir parçayı ilişkili akışa ekler.ReadableStreamDefaultController.error()
, ilişkili akışla gelecekteki tüm etkileşimlerin hatayla sonuçlanmasına neden olur.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
ReadableStream()
kurucusunun ikinci, yine isteğe bağlı bağımsız değişkeni queuingStrategy
'dur.
İsteğe bağlı olarak akış için bir sıra stratejisi tanımlayan bir nesnedir ve iki parametre alır:
highWaterMark
: Bu sıraya ekleme stratejisinin kullanıldığı yayının en yüksek noktasını gösteren sıfırdan büyük bir sayı.size(chunk)
: Belirtilen parça değerinin sonlu ve negatif olmayan boyutunu hesaplayıp döndüren bir işlev. Sonuç, geri basıncı belirlemek için kullanılır ve uygunReadableStreamDefaultController.desiredSize
mülkü aracılığıyla gösterilir. Ayrıca, temel kaynağınpull()
yönteminin ne zaman çağrılacağını da belirler.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
getReader()
ve read()
yöntemleri
Okunabilir bir akıştan okumak için bir okuyucuya ihtiyacınız vardır. Bu okuyucu ReadableStreamDefaultReader
olacaktır.
ReadableStream
arayüzünün getReader()
yöntemi bir okuyucu oluşturur ve akışı bu okuyucuya kilitler. Akış kilitliyken bu okuyucu serbest bırakılana kadar başka okuyucu edinilemez.
ReadableStreamDefaultReader
arayüzünün read()
yöntemi, akıştaki dahili kuyrukta sonraki parçaya erişim sağlayan bir söz döndürür. Akış durumuna bağlı olarak bir sonuçla isteği yerine getirir veya reddeder. Olasılıklar şunlardır:
- Bir parça mevcutsa söz,
{ value: chunk, done: false }
biçiminde bir nesneyle yerine getirilir. - Akış kapatılırsa söz,
{ value: undefined, done: true }
biçiminde bir nesneyle yerine getirilir. - Akışta hata oluşursa söz konusu hata ile birlikte söz reddedilir.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
locked
mülkü
Okunabilir bir aktarımın kilitli olup olmadığını, ReadableStream.locked
mülküne erişerek kontrol edebilirsiniz.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Okunabilir akış kod örnekleri
Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir. Öncelikle, underlyingSource
bağımsız değişkeninde (yani TimestampSource
sınıfında) bir start()
yöntemi tanımlayan bir ReadableStream
oluşturursunuz.
Bu yöntem, yayının controller
özelliğine on saniye boyunca her saniye bir zaman damgası göndermesini söyler.enqueue()
Son olarak da akışa close()
vermesini söyler. Bu akışı, getReader()
yöntemi aracılığıyla bir okuyucu oluşturarak ve akış done
olana kadar read()
'u çağırarak tüketirsiniz.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Eşzamansız iterasyon
Her read()
döngü iterasyonunda akış done
olup olmadığını kontrol etmek en uygun API olmayabilir.
Neyse ki yakında bunu yapmanın daha iyi bir yolu olacak: eşzamansız iterasyon.
for await (const chunk of stream) {
console.log(chunk);
}
Eş zamansız iterasyonu bugün kullanmak için bir geçici çözüm, davranışı bir polyfill ile uygulamaktır.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Okunabilir bir akış oluşturma
ReadableStream
arayüzünün tee()
yöntemi, mevcut okunabilir akışı ayırarak yeni ReadableStream
örnekleri olarak iki dal içeren iki öğeli bir dizi döndürür. Bu sayede iki okuyucu aynı anda bir yayını okuyabilir. Örneğin, sunucudan bir yanıt almak ve bunu tarayıcıya aktarmak, aynı zamanda hizmet çalışanı önbelleğine aktarmak istiyorsanız bunu bir hizmet çalışanında yapabilirsiniz. Yanıt gövdesi birden fazla kez kullanılamayacağından bunu yapmak için iki kopyaya ihtiyacınız vardır. Ardından, akışı iptal etmek için ortaya çıkan her iki dalı da iptal etmeniz gerekir. Bir yayını başlattığınızda yayın genellikle kilitlenir ve diğer okuyucular tarafından kilitlenmesi engellenir.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Okunabilir bayt akışları
Baytları temsil eden akışlar için, baytları verimli bir şekilde işlemek amacıyla (özellikle kopyaları en aza indirerek) okunabilir akışın genişletilmiş bir sürümü sağlanır. Bayt akışları, kendi arabelleğinizi getirme (BYOB) okuyucularının edinilmesine olanak tanır. Varsayılan uygulama, WebSocket'ler söz konusu olduğunda dize veya dizi arabellekleri gibi çeşitli farklı çıkışlar verebilir. Byte akışları ise bayt çıkışını garanti eder. Ayrıca BYOB okuyucuları kararlılık avantajlarından da yararlanabilir. Bunun nedeni, bir arabellek ayrılırsa aynı arabelleğe iki kez yazılmayacağının garanti edilebilmesi ve böylece yarış koşullarının önlenebilmesidir. BYOB okuyucular, arabellekleri yeniden kullanabildiği için tarayıcının çöp toplama işlemini çalıştırması gereken süreyi azaltabilir.
Okunabilir bir bayt akışı oluşturma
ReadableStream()
oluşturucusuna ek bir type
parametresi ileterek okunabilir bir bayt akışı oluşturabilirsiniz.
new ReadableStream({ type: 'bytes' });
underlyingSource
Okunabilir bir bayt akışının temel kaynağına, üzerinde işlem yapmak için bir ReadableByteStreamController
verilir. ReadableByteStreamController.enqueue()
yöntemi, değeri ArrayBufferView
olan bir chunk
bağımsız değişkeni alır. ReadableByteStreamController.byobRequest
mülkü, mevcut BYOB çekme isteğini döndürür veya istekte bulunulmamışsa null değerini döndürür. Son olarak ReadableByteStreamController.desiredSize
mülkü, kontrol edilen akıştaki dahili kuyruğu doldurmak için istenen boyutu döndürür.
queuingStrategy
ReadableStream()
kurucusunun ikinci, yine isteğe bağlı bağımsız değişkeni queuingStrategy
'dur.
İsteğe bağlı olarak akış için bir sıra stratejisi tanımlayan ve bir parametre alan bir nesnedir:
highWaterMark
: Bu sıraya ekleme stratejisini kullanan yayının en yüksek noktasını gösteren sıfırdan büyük bir bayt sayısı. Bu, geri basıncı belirlemek için kullanılır ve uygunReadableByteStreamController.desiredSize
mülkü aracılığıyla gösterilir. Ayrıca, temel kaynağınpull()
yönteminin ne zaman çağrılacağını da belirler.
getReader()
ve read()
yöntemleri
Ardından, mode
parametresini uygun şekilde ayarlayarak ReadableStreamBYOBReader
'e erişebilirsiniz:
ReadableStream.getReader({ mode: "byob" })
. Bu sayede, kopyalardan kaçınmak için arabellek ayırma üzerinde daha hassas kontrol sağlanır. Bayt akışından okumak için ReadableStreamBYOBReader.read(view)
işlevini çağırmanız gerekir. Burada view
bir ArrayBufferView
bağımsız değişkenidir.
Okunabilir bayt akışı kod örneği
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Aşağıdaki işlev, rastgele oluşturulmuş bir dizinin sıfır kopyalamayla verimli bir şekilde okunmasına olanak tanıyan okunabilir bayt akışları döndürür. Önceden belirlenmiş 1.024 boyutunda bir parça yerine geliştirici tarafından sağlanan arabelleği doldurmaya çalışır ve böylece tam kontrol sağlar.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Yazılabilir akışların işleyiş şekli
Yazılabilir akış, JavaScript'te WritableStream
nesnesi ile temsil edilen, veri yazabileceğiniz bir hedeftir. Bu, ham verilerin yazıldığı alt düzey bir G/Ç havuzu olan temel havuzun üst kısmında bir soyutlama görevi görür.
Veriler, bir yazar aracılığıyla akışı tek tek parçalara yazar. Bir parça, tıpkı bir okuyucudaki parçalar gibi birçok şekilde olabilir. Yazmaya hazır parçalar oluşturmak için istediğiniz kodu kullanabilirsiniz. Yazıcı ve ilişkili koda üretici denir.
Bir yazar oluşturulduğunda ve bir akışa yazmaya başladığında (etkin yazar), akışa kilitlendiği söylenir. Yazılabilir bir akışa aynı anda yalnızca bir yazar yazabilir. Başka bir yazarın akışınıza yazmaya başlamasını istiyorsanız genellikle yayını yayınlamanız ve ardından başka bir yazar eklemeniz gerekir.
Dahili bir sıra, akışa yazılmış ancak henüz temel alıcı tarafından işlenmemiş parçaları izler.
Sıralama stratejisi, bir aktarımın dahili kuyruğunun durumuna göre geri basıncı nasıl bildirmesi gerektiğini belirleyen bir nesnedir. Sıralama stratejisi her bir parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu maksimum değer olarak bilinen belirli bir sayıyla karşılaştırır.
Nihai yapıya denetleyici adı verilir. Yazılabilir her akış, akışı kontrol etmenize (örneğin, iptal etmenize) olanak tanıyan ilişkili bir denetleyiciye sahiptir.
Yazılabilir akış oluşturma
Streams API'nin WritableStream
arayüzü, aktarma noktası olarak bilinen bir hedefe aktarılan verileri yazmak için standart bir soyutlama sağlar. Bu nesne, yerleşik geri basınç ve sıraya alma özelliğine sahiptir. Oluşturucusunu WritableStream()
çağırarak yazılabilir bir akış oluşturursunuz.
Oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntem ve özelliklere sahip bir nesneyi temsil eden isteğe bağlı bir underlyingSink
parametresi vardır.
underlyingSink
underlyingSink
, geliştirici tarafından tanımlanan aşağıdaki isteğe bağlı yöntemleri içerebilir. Bazı yöntemlere iletilen controller
parametresi bir WritableStreamDefaultController
öğesidir.
start(controller)
: Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Bu yöntemin içeriği, temel lavaboya erişmeyi amaçlamalıdır. Bu işlem eşzamansız olarak yapılacaksa başarı veya başarısızlığı bildiren bir promise döndürülebilir.write(chunk, controller)
: Bu yöntem, yeni bir veri parçası (chunk
parametresinde belirtilir) temel alıcıya yazılmaya hazır olduğunda çağrılır. Yazma işleminin başarılı veya başarısız olduğunu belirtmek için bir promise döndürebilir. Bu yöntem yalnızca önceki yazma işlemleri başarılı olduktan sonra çağrılır ve hiçbir zaman akış kapatıldıktan veya iptal edildikten sonra çağrılmaz.close(controller)
: Uygulama, akışa parça yazma işlemini tamamladığını bildirirse bu yöntem çağrılır. İçerik, temel havuza yapılan yazma işlemlerini tamamlamak ve havuza erişimi serbest bırakmak için gereken her şeyi yapmalıdır. Bu işlem eşzamanlı değilse başarı veya başarısızlık sinyali vermek için bir söz döndürebilir. Bu yöntem yalnızca sıraya alınmış tüm yazma işlemleri başarılı olduktan sonra çağrılır.abort(reason)
: Uygulama, akışı aniden kapatmak ve hatalı duruma geçirmek istediğini belirtirse bu yöntem çağrılır.close()
gibi tutulan tüm kaynakları temizleyebilir ancak yazma işlemleri sıraya alınmış olsa bileabort()
çağrılır. Bu parçalar atılır. Bu işlem eşzamanlı değilse başarı veya başarısızlık sinyali vermek için bir promise döndürebilir.reason
parametresi, aktarımın neden iptal edildiğini açıklayan birDOMString
içerir.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Streams API'nin WritableStreamDefaultController
arabiriminde, yazma işlemi için daha fazla parça gönderilirken veya yazma işleminin sonunda WritableStream
durumunun ayarlama sırasında kontrol edilmesine olanak tanıyan bir kontrolör bulunur. Bir WritableStream
oluşturulurken, temel havuza, üzerinde işlem yapılacak karşılık gelen bir WritableStreamDefaultController
örneği verilir. WritableStreamDefaultController
yalnızca bir yönteme sahiptir:
WritableStreamDefaultController.error()
. Bu yöntem, ilişkili akışla gelecekteki etkileşimlerin hatayla sonuçlanmasına neden olur.
WritableStreamDefaultController
, AbortSignal
örneği döndüren bir signal
özelliğini de destekler. Bu özellik, gerektiğinde WritableStream
işleminin durdurulmasına olanak tanır.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
WritableStream()
kurucusunun ikinci, yine isteğe bağlı bağımsız değişkeni queuingStrategy
'dur.
İsteğe bağlı olarak akış için bir sıra stratejisi tanımlayan bir nesnedir ve iki parametre alır:
highWaterMark
: Bu sıraya ekleme stratejisinin kullanıldığı yayının en yüksek noktasını gösteren sıfırdan büyük bir sayı.size(chunk)
: Belirtilen parça değerinin sonlu ve negatif olmayan boyutunu hesaplayıp döndüren bir işlev. Sonuç, geri basıncı belirlemek için kullanılır ve uygunWritableStreamDefaultWriter.desiredSize
mülkü aracılığıyla gösterilir.
getWriter()
ve write()
yöntemleri
Yazılabilir bir akışa yazmak için bir yazara (WritableStreamDefaultWriter
) ihtiyacınız vardır. WritableStream
arayüzünün getWriter()
yöntemi, yeni bir WritableStreamDefaultWriter
örneği döndürür ve akışı bu örneğe kilitler. Akış kilitliyken mevcut yazar bırakılana kadar başka yazar edinilemez.
WritableStreamDefaultWriter
arabiriminin write()
yöntemi, iletilen bir veri kümesini bir WritableStream
'e ve temelindeki havuza yazar, ardından yazma işleminin başarılı veya başarısız olduğunu belirten bir söz döndürür. "Başarılı" ifadesinin ne anlama geldiğinin temeldeki havuza bağlı olduğunu unutmayın. Bu ifade, parçanın kabul edildiğini belirtebilir ancak nihai hedefine güvenli bir şekilde kaydedildiğini göstermeyebilir.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
locked
mülkü
Yazılabilir bir aktarımın kilitli olup olmadığını kontrol etmek için WritableStream.locked
mülküne erişebilirsiniz.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Yazılabilir akış kod örneği
Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Okunabilir bir akışı, yazılabilir bir akışa aktarma
Okunabilir bir akış, okunabilir akışın pipeTo()
yöntemi aracılığıyla yazılabilir bir akışa aktarılabilir.
ReadableStream.pipeTo()
, mevcut ReadableStream
öğesini belirli bir WritableStream
öğesine aktarır ve aktarma işlemi başarıyla tamamlandığında yerine getirilen veya herhangi bir hatayla karşılaşıldığında reddedilen bir promise döndürür.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Dönüşüm akışı oluşturma
Streams API'nin TransformStream
arayüzü, dönüştürülebilir bir veri grubunu temsil eder. Bir dönüştürme akışı oluşturmak için TransformStream()
yapıcısını çağırırsınız. Bu yapıcı, belirtilen işleyicilerden bir dönüştürme akışı nesnesi oluşturup döndürür. TransformStream()
kurucusu, ilk bağımsız değişkeni olarak transformer
'yi temsil eden isteğe bağlı bir JavaScript nesnesi kabul eder. Bu tür nesneler aşağıdaki yöntemlerden herhangi birini içerebilir:
transformer
start(controller)
: Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Genelliklecontroller.enqueue()
kullanılarak ön ek parçalarını sıraya eklemek için kullanılır. Bu parçalar okunabilir taraftan okunur ancak yazılabilir tarafa yapılan yazma işlemlerine bağlı değildir. Bu ilk işlem, örneğin ön ek parçalarını elde etmek biraz çaba gerektirdiği için eşzamanlı değilse işlev, başarı veya başarısızlığı bildirmek için bir söz döndürebilir. Reddedilen bir söz, aktarımda hata oluşturur. Atılan tüm istisnalar,TransformStream()
kurucusu tarafından yeniden atılır.transform(chunk, controller)
: Bu yöntem, orijinal olarak yazılabilir tarafa yazılan yeni bir parça dönüştürülmeye hazır olduğunda çağrılır. Akış uygulaması, bu işlevin yalnızca önceki dönüştürme işlemleri başarılı olduktan sonra çağrılacağını ve hiçbir zamanstart()
tamamlanmadan önce veyaflush()
çağrıldıktan sonra çağrılmayacağını garanti eder. Bu işlev, dönüştürme akışının gerçek dönüştürme işlemini gerçekleştirir. Sonuçlarıcontroller.enqueue()
kullanarak sıraya ekleyebilir. Bu, yazılabilir tarafa yazılan tek bir parçanın,controller.enqueue()
'ün kaç kez çağrıldığına bağlı olarak okunabilir tarafta sıfır veya birden fazla parçayla sonuçlanmasını sağlar. Dönüşüm işlemi eşzamanlı değilse bu işlev, dönüşümün başarılı veya başarısız olduğunu bildiren bir promise döndürebilir. Reddedilen bir söz, dönüştürme akışının hem okunabilir hem de yazılabilir tarafında hata verir. Hiçbirtransform()
yöntemi sağlanmazsa kimlik dönüştürme kullanılır. Bu yöntem, yazılabilir taraftan okunabilir tarafa değişmeden parçaları ekler.flush(controller)
: Bu yöntem, yazılabilir tarafa yazılan tüm parçalartransform()
üzerinden başarıyla geçirilerek dönüştürüldükten ve yazılabilir taraf kapatılmak üzereyken çağrılır. Bu genellikle, son ek parçalarını okunabilir tarafa eklemek için kullanılır. Boşaltma işlemi eşzamanlı değilse işlev, başarı veya başarısızlığı belirtmek için bir promise döndürebilir. Sonuç,stream.writable.write()
çağrısını yapan kullanıcıya iletilir. Ayrıca, reddedilen bir promise, aktarımın hem okunabilir hem de yazılabilir tarafında hata oluşturur. İstisna atma işlemi, reddedilen bir promise döndürmekle aynı şekilde değerlendirilir.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
writableStrategy
ve readableStrategy
sıra stratejileri
TransformStream()
kurucusunun ikinci ve üçüncü isteğe bağlı parametreleri, isteğe bağlı writableStrategy
ve readableStrategy
sıra stratejileridir. Bunlar sırasıyla okunur ve yazılabilir akış bölümlerinde belirtildiği şekilde tanımlanır.
Dönüşüm akışı kod örneği
Aşağıdaki kod örneğinde, basit bir dönüştürme akışının işleyişi gösterilmektedir.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Okunabilir bir akışı dönüştürme akışı üzerinden aktarma
ReadableStream
arayüzünün pipeThrough()
yöntemi, mevcut akışı bir dönüştürme akışı veya başka bir yazılabilir/okunabilir çift üzerinden aktarmanın zincirlenebilir bir yolunu sağlar. Bir akışı boruya aktarmak genellikle akışı boru boyunca kilitler ve diğer okuyucuların kilitlemesini önler.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Aşağıdaki kod örneğinde (biraz yapay), döndürülen yanıt vaadini bir akış olarak tüketerek ve parça parça büyük harf kullanarak tüm metni büyük harf yapan fetch()
işlevinin "büyük harflerle yazma" sürümünü nasıl uygulayabileceğiniz gösterilmektedir. Bu yaklaşımın avantajı, dokümanın tamamının indirilmesini beklemeniz gerekmemesidir. Bu, büyük dosyalarla çalışırken büyük bir fark yaratabilir.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
Aşağıdaki demoda, okunabilir, yazılabilir ve dönüştürme akışları gösterilmektedir. Ayrıca pipeThrough()
ve pipeTo()
boru zinciri örnekleri ve tee()
gösterilmektedir. İsterseniz demoyu kendi penceresinde çalıştırabilir veya kaynak kodunu görüntüleyebilirsiniz.
Tarayıcıda kullanılabilen faydalı akışlar
Tarayıcıya yerleşik olarak birçok yararlı akış bulunur. Bir blob'dan kolayca ReadableStream
oluşturabilirsiniz. Blob
arayüzünün stream() yöntemi, okunması sonucunda blob içindeki verileri döndüren bir ReadableStream
döndürür. Ayrıca, File
nesnesinin belirli bir Blob
türü olduğunu ve bir blob'un kullanılabildiği her bağlamda kullanılabileceğini unutmayın.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
TextDecoder.decode()
ve TextEncoder.encode()
'un yayın varyantları sırasıyla TextDecoderStream
ve TextEncoderStream
olarak adlandırılır.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Sırasıyla CompressionStream
ve DecompressionStream
dönüştürme akışlarıyla dosyaları sıkıştırmak veya sıkıştırılmış dosyaları açmak kolaydır. Aşağıdaki kod örneğinde, Streams spesifikasyonunu nasıl indireceğiniz, doğrudan tarayıcıda nasıl sıkıştıracağınız (gzip) ve sıkıştırılmış dosyayı doğrudan diske nasıl yazacağınız gösterilmektedir.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
File System Access API'nin FileSystemWritableFileStream
ve deneysel fetch()
istek akışları, kullanımdaki yazılabilir akışlara örnek gösterilebilir.
Serial API, hem okunabilir hem de yazılabilir akışları yoğun bir şekilde kullanır.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Son olarak WebSocketStream
API, akışları WebSocket API ile entegre eder.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Faydalı kaynaklar
- Akışlar spesifikasyonu
- Eşlik eden demolar
- Akış polyfill'i
- 2016: web akışlarının yılı
- Eşzamansız iteratör ve üreteçler
- Akış Görselleştirici
Teşekkür ederiz
Bu makale, Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley ve Adam Rice tarafından incelendi. Jake Archibald'ın blog yayınları, yayınları anlamama çok yardımcı oldu. Kod örneklerinin bazıları GitHub kullanıcısı @bellbind'in keşiflerinden esinlenmiştir ve metnin bazı bölümleri Akışlar hakkında MDN Web Dokümanları'na dayanır. Akışlar Standardı'nın yazarları bu spesifikasyonu yazarken çok iyi bir iş çıkarmış. Unsplash'tan Ryan Lara tarafından oluşturulan lokomotif resim.