Streams API ile okunabilir, yazılabilir ve dönüştürülebilir akışları nasıl kullanacağınızı öğrenin.
Streams API, ağ üzerinden alınan veya yerel olarak oluşturulan veri akışlarına programatik olarak erişmenize ve bunları JavaScript ile işlemenize olanak tanır. Akış, almak, göndermek veya dönüştürmek istediğiniz bir kaynağın küçük parçalara ayrılmasını ve bu parçaların bit bit işlenmesini içerir. Tarayıcılar, web sayfalarında gösterilecek HTML veya videolar gibi öğeleri alırken zaten akış gerçekleştirir. Ancak bu özellik, 2015'te akışlar tanıtılana kadar JavaScript'te kullanılamıyordu.fetch
Daha önce, bir kaynağı (video veya metin dosyası vb.) işlemek istediğinizde dosyanın tamamını indirmeniz, uygun bir biçime seri durumdan çıkarılmasını beklemeniz ve ardından işlemeniz gerekiyordu. Ancak akışlar JavaScript'te kullanılabildiğinden bu durum değişiyor. Artık istemcide kullanılabilir hale gelir gelmez arabellek, dize veya blob oluşturmanıza gerek kalmadan ham verileri JavaScript ile aşamalı olarak işleyebilirsiniz. Bu, aşağıda listelediğim bazı kullanım alanlarının kilidini açar:
- Video efektleri: Efektleri gerçek zamanlı olarak uygulayan bir dönüştürme akışı aracılığıyla okunabilir bir video akışı oluşturma.
- Veri (sıkıştırma) açma: Bir dosya akışını, seçici olarak (sıkıştırma) açma işlemi yapan bir dönüştürme akışından geçirme.
- Resim kod çözme: Bir HTTP yanıtı akışını, baytların bit eşlem verilerine kodunu çözen bir dönüştürme akışından ve ardından bit eşlemleri PNG'lere çeviren başka bir dönüştürme akışından geçirme. Bir hizmet çalışanının
fetch
işleyicisine yüklenirse AVIF gibi yeni görüntü biçimlerini şeffaf bir şekilde doldurmanıza olanak tanır.
Tarayıcı desteği
ReadableStream ve WritableStream
TransformStream
Temel kavramlar
Çeşitli yayın türleri hakkında ayrıntılı bilgi vermeden önce bazı temel kavramları açıklayalım.
Büyük Parça
Parça, bir akışa yazılan veya akıştan okunan tek bir veri parçasıdır. Herhangi bir türde olabilir. Hatta akışlar farklı türlerdeki parçaları içerebilir. Çoğu zaman, bir parça belirli bir akış için en küçük veri birimi olmaz. Örneğin, bir bayt akışı tek baytlar yerine 16 KiB Uint8Array
birimlerinden oluşan parçalar içerebilir.
Okunabilir akışlar
Okunabilir bir akış, okuyabileceğiniz bir veri kaynağını temsil eder. Başka bir deyişle, veriler okunabilir bir akıştan çıkar. Okunabilir akış, ReadableStream
sınıfının bir örneğidir.
Yazılabilir akışlar
Yazılabilir akış, veri için yazabileceğiniz bir hedefi temsil eder. Diğer bir deyişle, veriler yazılabilir bir akışa girer. Daha net bir ifadeyle, yazılabilir akış, WritableStream
sınıfının bir örneğidir.
Akışları dönüştürme
Dönüştürme akışı, bir çift akıştan oluşur: yazılabilir tarafı olarak bilinen yazılabilir bir akış ve okunabilir tarafı olarak bilinen okunabilir bir akış.
Bunun gerçek dünyadaki metaforu, bir dilden diğerine anında çeviri yapan bir simultane çevirmendir.
Dönüştürme akışına özgü bir şekilde, yazılabilir tarafa yazma işlemi, okunabilir tarafta okunmak üzere yeni verilerin kullanılabilir hale gelmesiyle sonuçlanır. Dönüşüm akışı olarak writable
özelliği ve readable
özelliği olan tüm nesneler kullanılabilir. Ancak standart TransformStream
sınıfı, uygun şekilde iç içe geçmiş böyle bir çift oluşturmayı kolaylaştırır.
Boru zincirleri
Akışlar, öncelikli olarak birbirlerine yönlendirilerek kullanılır. Okunabilir bir akış, okunabilir akışın pipeTo()
yöntemi kullanılarak doğrudan yazılabilir bir akışa yönlendirilebilir veya okunabilir akışın pipeThrough()
yöntemi kullanılarak önce bir veya daha fazla dönüştürme akışından geçirilebilir. Bu şekilde birbirine bağlanmış akışlar kümesi, boru zinciri olarak adlandırılır.
Geri basınç
Bir boru zinciri oluşturulduktan sonra, parçaların içinden ne kadar hızlı akması gerektiğiyle ilgili sinyaller yayar. Zincirdeki herhangi bir adım henüz parçaları kabul edemiyorsa boru zinciri boyunca geriye doğru bir sinyal yayar. Sonunda orijinal kaynağa parçaları bu kadar hızlı üretmeyi bırakması söylenir. Bu akışı normalleştirme sürecine geri basınç denir.
Teeing
Okunabilir bir akış, tee()
yöntemi kullanılarak bölünebilir (büyük "T" harfinin şeklinden adını alır).
Bu işlem, akışı kilitler (yani doğrudan kullanılamaz hale getirir). Ancak, bağımsız olarak kullanılabilen dallar adı verilen iki yeni akış oluşturur.
Yayınlar geri sarılamadığı veya yeniden başlatılamadığı için teeing de önemlidir. Bu konuyla ilgili daha fazla bilgiyi sonraki bölümlerde bulabilirsiniz.
Okunabilir bir akışın mekaniği
Okunabilir akış, temel bir kaynaktan akan bir ReadableStream
nesnesiyle JavaScript'te temsil edilen bir veri kaynağıdır. The
ReadableStream()
yapıcı, belirtilen işleyicilerden okunabilir bir akış nesnesi oluşturur ve döndürür. İki tür temel kaynak vardır:
- Push kaynakları, eriştiğinizde sürekli olarak size veri gönderir. Akışa erişimi başlatmak, duraklatmak veya iptal etmek size bağlıdır. Canlı video akışları, sunucu tarafından gönderilen etkinlikler veya WebSocket'ler örnek olarak verilebilir.
- Çekme kaynakları, bağlandıktan sonra verileri açıkça istemenizi gerektirir.
fetch()
veyaXMLHttpRequest
çağrıları aracılığıyla yapılan HTTP işlemleri buna örnek verilebilir.
Akış verileri, parçalar adı verilen küçük birimler halinde sırayla okunur. Bir akışa yerleştirilen parçaların sıraya alındığı söylenir. Bu, okunmaya hazır bir şekilde sıraya alındıkları anlamına gelir. Dahili kuyruk, henüz okunmamış parçaları takip eder.
Sıraya alma stratejisi, bir akışın dahili sırasının durumuna göre nasıl geri basınç sinyali vermesi gerektiğini belirleyen bir nesnedir. Kuyruğa alma stratejisi, her bir parçaya bir boyut atar ve kuyruktaki tüm parçaların toplam boyutunu, yüksek su işareti olarak bilinen belirtilen bir sayıyla karşılaştırır.
Akışın içindeki parçalar bir okuyucu tarafından okunur. Bu okuyucu, verileri tek seferde bir parça halinde alır ve üzerinde istediğiniz işlemi yapmanıza olanak tanır. Okuyucu ve beraberindeki diğer işleme kodu tüketici olarak adlandırılır.
Bu bağlamdaki bir sonraki yapıya denetleyici adı verilir. Okunabilir her akışın, adından da anlaşılacağı gibi akışı kontrol etmenize olanak tanıyan ilişkili bir denetleyicisi vardır.
Aynı anda yalnızca bir okuyucu bir akışı okuyabilir. Bir okuyucu oluşturulup bir akışı okumaya başladığında (yani etkin okuyucu olduğunda) akış kilitlenir. Yayınınızı başka bir okuyucunun okumasını istiyorsanız genellikle başka bir işlem yapmadan önce ilk okuyucuyu serbest bırakmanız gerekir (ancak yayınları hazırlayabilirsiniz).
Okunabilir bir akış oluşturma
Yapılandırıcısını ReadableStream()
çağırarak okunabilir bir akış oluşturursunuz.
Yapılandırıcıda, oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntemler ve özellikler içeren bir nesneyi temsil eden isteğe bağlı bir bağımsız değişken underlyingSource
bulunur.
underlyingSource
Bu, aşağıdaki isteğe bağlı, geliştirici tanımlı yöntemleri kullanabilir:
start(controller)
: Nesne oluşturulduğunda hemen çağrılır. Yöntem, akış kaynağına erişebilir ve akış işlevini ayarlamak için gereken diğer tüm işlemleri yapabilir. Bu işlem eşzamansız olarak yapılacaksa yöntem, başarıyı veya başarısızlığı bildirmek için bir söz döndürebilir. Bu yönteme iletilencontroller
parametresi birReadableStreamDefaultController
.pull(controller)
: Daha fazla parça getirildikçe akışı kontrol etmek için kullanılabilir. Akışın dahili parça sırası dolana kadar veya sıra en yüksek seviyesine ulaşana kadar tekrar tekrar çağrılır.pull()
çağrısının sonucu bir söz ise bu söz yerine getirilene kadarpull()
tekrar çağrılmaz. Söz reddedilirse akış hatalı hale gelir.cancel(reason)
: Akış tüketicisi akışı iptal ettiğinde çağrılır.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
aşağıdaki yöntemleri destekler:
ReadableStreamDefaultController.close()
ilişkili akışı kapatır.ReadableStreamDefaultController.enqueue()
belirli bir parçayı ilişkili akışta sıraya alır.ReadableStreamDefaultController.error()
ilişkili akışla gelecekteki tüm etkileşimlerin hata vermesine neden olur.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
ReadableStream()
oluşturucusunun ikinci ve isteğe bağlı olan bağımsız değişkeni queuingStrategy
'dir.
Bu, akış için isteğe bağlı olarak bir sıralama stratejisi tanımlayan ve iki parametre alan bir nesnedir:
highWaterMark
: Bu sıralama stratejisi kullanılarak akışın en yüksek noktasını gösteren negatif olmayan bir sayı.size(chunk)
: Belirtilen parça değerinin sonlu ve negatif olmayan boyutunu hesaplayıp döndüren bir işlev. Sonuç, uygunReadableStreamDefaultController.desiredSize
özelliği aracılığıyla kendini gösteren geri basıncı belirlemek için kullanılır. Ayrıca, temel kaynağınpull()
yönteminin ne zaman çağrılacağını da yönetir.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
getReader()
ve read()
yöntemleri
Okunabilir bir akıştan okumak için ReadableStreamDefaultReader
olan bir okuyucuya ihtiyacınız vardır.
ReadableStream
arayüzünün getReader()
yöntemi bir okuyucu oluşturur ve akışı bu okuyucuya kilitler. Akış kilitliyken bu okuyucu serbest bırakılana kadar başka okuyucu alınamaz.
ReadableStreamDefaultReader
arayüzünün read()
yöntemi, akışın dahili kuyruğundaki bir sonraki parçaya erişim sağlayan bir söz döndürür. Akışın durumuna bağlı olarak bir sonuçla yerine getirir veya reddeder. Farklı olasılıklar şunlardır:
- Bir parça varsa söz,
{ value: chunk, done: false }
biçiminde bir nesneyle yerine getirilir. - Akış kapatılırsa söz,
{ value: undefined, done: true }
biçiminde bir nesneyle yerine getirilir. - Akışta hata oluşursa söz, ilgili hatayla birlikte reddedilir.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
locked
özelliği
Okunabilir bir akışın kilitli olup olmadığını, akışın ReadableStream.locked
özelliğine erişerek kontrol edebilirsiniz.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Okunabilir akış kod örnekleri
Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir. Öncelikle, ReadableStream
oluşturursunuz. Bu ReadableStream
, underlyingSource
bağımsız değişkeninde (yani TimestampSource
sınıfında) start()
yöntemini tanımlar.
Bu yöntem, akışın controller
enqueue()
saniyede bir zaman damgası eklemesini sağlar.
Son olarak, kumandaya akışı close()
etmesini söyler. Bu akışı, getReader()
yöntemiyle bir okuyucu oluşturup akış done
olana kadar read()
yöntemini çağırarak tüketirsiniz.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Eşzamansız yineleme
Akışın done
olup olmadığını her read()
döngü yinelemesinde kontrol etmek en uygun API olmayabilir.
Neyse ki yakında bu işlemi yapmanın daha iyi bir yolu olacak: eşzamansız yineleme.
for await (const chunk of stream) {
console.log(chunk);
}
Eş zamansız yinelemeyi kullanmak için geçici bir çözüm olarak, davranışı bir polyfill ile uygulayabilirsiniz.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Okunabilir bir akışı hazırlama
tee()
yöntemi, ReadableStream
arayüzü mevcut okunabilir akışı hazırlar ve sonuçtaki iki dalı yeni ReadableStream
örnekleri olarak içeren iki öğeli bir dizi döndürür. Bu sayede iki okuyucu aynı anda bir akışı okuyabilir. Örneğin, sunucudan bir yanıt getirip tarayıcıya aktarmak ve aynı zamanda hizmet çalışanı önbelleğine aktarmak istiyorsanız bunu bir hizmet çalışanında yapabilirsiniz. Yanıt gövdesi birden fazla kez kullanılamadığından bunu yapmak için iki kopya gerekir. Akışı iptal etmek için sonuçlanan her iki dalı da iptal etmeniz gerekir. Bir yayını başlatmak
genellikle yayını süre boyunca kilitler ve diğer okuyucuların kilitlemesini engeller.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Okunabilir bayt akışları
Baytları temsil eden akışlar için, özellikle kopyaları en aza indirerek baytları verimli bir şekilde işlemek üzere okunabilir akışın genişletilmiş bir sürümü sağlanır. Bayt akışları, kendi arabelleğini getirme (BYOB) okuyucularının edinilmesine olanak tanır. Varsayılan uygulama, WebSockets durumunda dizeler veya dizi arabellekleri gibi çeşitli çıkışlar verebilirken bayt akışları bayt çıkışını garanti eder. Ayrıca BYOB okuyucular kararlılık avantajlarına sahiptir. Bunun nedeni, bir arabellek ayrılırsa aynı arabelleğe iki kez yazılmayacağının garanti edilebilmesi ve böylece yarış koşullarının önlenmesidir. BYOB okuyucular, arabellekleri yeniden kullanabildikleri için tarayıcının çöp toplama işlemini çalıştırması gereken süreyi azaltabilir.
Okunabilir bir bayt akışı oluşturma
type
oluşturucusuna ek bir ReadableStream()
parametresi ileterek okunabilir bir bayt akışı oluşturabilirsiniz.
new ReadableStream({ type: 'bytes' });
underlyingSource
Okunabilir bir bayt akışının temel kaynağı, üzerinde işlem yapmak için ReadableByteStreamController
olarak verilir. ReadableByteStreamController.enqueue()
yöntemi, değeri ArrayBufferView
olan bir chunk
bağımsız değişkeni alır. ReadableByteStreamController.byobRequest
özelliği, mevcut BYOB çekme isteğini veya yoksa boş değeri döndürür. Son olarak, ReadableByteStreamController.desiredSize
özelliği, kontrollü akışın dahili kuyruğunu doldurmak için istenen boyutu döndürür.
queuingStrategy
ReadableStream()
oluşturucusunun ikinci bağımsız değişkeni de isteğe bağlıdır ve queuingStrategy
'dir.
Bu, isteğe bağlı olarak akış için bir kuyruğa alma stratejisi tanımlayan ve tek bir parametre alan bir nesnedir:
highWaterMark
: Bu kuyruğa alma stratejisi kullanılarak akışın yüksek su işareti gösteren negatif olmayan bir bayt sayısı. Bu, uygunReadableByteStreamController.desiredSize
özelliği aracılığıyla kendini gösteren geri basıncı belirlemek için kullanılır. Ayrıca, temel kaynağınpull()
yönteminin ne zaman çağrılacağını da yönetir.
getReader()
ve read()
yöntemleri
Ardından, ReadableStreamBYOBReader
parametresini uygun şekilde ayarlayarak ReadableStreamBYOBReader
erişebilirsiniz:
ReadableStream.getReader({ mode: "byob" })
.mode
Bu sayede, kopyaları önlemek için arabellek ayırma işlemi üzerinde daha hassas bir kontrol sağlanır. Bayt akışından okumak için ReadableStreamBYOBReader.read(view)
işlevini çağırmanız gerekir. Burada view
, ArrayBufferView
değeridir.
Okunabilir bayt akışı kod örneği
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Aşağıdaki işlev, rastgele oluşturulmuş bir dizinin verimli sıfır kopyalı okunmasına olanak tanıyan okunabilir bayt akışları döndürür. 1.024'lük önceden belirlenmiş bir parça boyutu kullanmak yerine, geliştirici tarafından sağlanan arabelleği doldurmaya çalışır ve tam kontrol sağlar.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Yazılabilir akışın mekanizması
Yazılabilir akış, veri yazabileceğiniz bir hedeftir ve JavaScript'te WritableStream
nesnesiyle temsil edilir. Bu, temel bir alıcı üzerinde soyutlama görevi görür. Temel alıcı, ham verilerin yazıldığı daha düşük düzeyli bir G/Ç alıcısıdır.
Veriler, akışa yazıcı aracılığıyla tek seferde bir parça olacak şekilde yazılır. Bir okuyucudaki parçalar gibi, parçalar da çok çeşitli şekillerde olabilir. Yazmaya hazır parçalar oluşturmak için istediğiniz kodu kullanabilirsiniz. Yazar ve ilişkili kod, üretici olarak adlandırılır.
Bir yazar oluşturulup bir akışa yazmaya başladığında (etkin yazar) akışa kilitlendiği söylenir. Yazılabilir bir akışa aynı anda yalnızca bir yazar yazabilir. Başka bir yazarın akışınıza yazmaya başlamasını istiyorsanız genellikle akışı serbest bırakmanız ve ardından başka bir yazarı eklemeniz gerekir.
Dahili kuyruk, akışa yazılan ancak henüz temel hedef tarafından işlenmemiş parçaları takip eder.
Sıraya alma stratejisi, bir akışın dahili sırasının durumuna göre nasıl geri basınç sinyali vermesi gerektiğini belirleyen bir nesnedir. Kuyruğa alma stratejisi, her bir parçaya bir boyut atar ve kuyruktaki tüm parçaların toplam boyutunu, yüksek su işareti olarak bilinen belirtilen bir sayıyla karşılaştırır.
Son yapıya denetleyici adı verilir. Yazılabilir her akışın, akışı kontrol etmenize (ör. akışı durdurmak için) olanak tanıyan ilişkili bir denetleyicisi vardır.
Yazılabilir akış oluşturma
Streams API'nin WritableStream
arayüzü, akış verilerini hedef olarak bilinen bir alıcıya yazmak için standart bir soyutlama sağlar. Bu nesne, yerleşik geri basınç ve sıralama özellikleriyle birlikte gelir. Yazılabilir bir akış oluşturmak için
yapılandırıcısını çağırın
WritableStream()
.
Oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntemler ve özellikler içeren bir nesneyi temsil eden isteğe bağlı bir underlyingSink
parametresi vardır.
underlyingSink
underlyingSink
, aşağıdaki isteğe bağlı, geliştirici tanımlı yöntemleri içerebilir. Bazı yöntemlere iletilen controller
parametresi bir WritableStreamDefaultController
.
start(controller)
: Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Bu yöntemin içeriği, temel alttaki öğeye erişmeyi amaçlamalıdır. Bu işlem eşzamansız olarak yapılacaksa başarıyı veya başarısızlığı bildirmek için bir söz döndürebilir.write(chunk, controller)
: Bu yöntem, yeni bir veri parçası (chunk
parametresinde belirtilir) temel alttaki havuzda yazılmaya hazır olduğunda çağrılır. Yazma işleminin başarılı veya başarısız olduğunu bildirmek için bir söz döndürebilir. Bu yöntem yalnızca önceki yazma işlemleri başarılı olduktan sonra çağrılır ve akış kapatıldıktan veya iptal edildikten sonra asla çağrılmaz.close(controller)
: Bu yöntem, uygulama akışa parça yazmayı bitirdiğini işaret ederse çağrılır. İçerikler, temel hedefe yazma işlemlerini tamamlamak ve erişimi serbest bırakmak için gereken her şeyi yapmalıdır. Bu işlem eşzamansızsa başarıyı veya başarısızlığı bildirmek için bir söz döndürebilir. Bu yöntem yalnızca sıraya alınmış tüm yazma işlemleri başarılı olduktan sonra çağrılır.abort(reason)
: Bu yöntem, uygulama akışı aniden kapatmak ve hata durumuna geçirmek istediğini belirtirse çağrılır.close()
işlevi gibi bekletilen kaynakları temizleyebilir ancak yazma işlemleri sıraya alınmış olsa bileabort()
işlevi çağrılır. Bu parçalar atılır. Bu işlem eşzamansızsa başarıyı veya başarısızlığı bildirmek için bir söz döndürebilir.reason
parametresi, akışın neden durdurulduğunu açıklayan birDOMString
içerir.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Streams API'nin WritableStreamDefaultController
arayüzü, kurulum sırasında, yazma için daha fazla parça gönderilirken veya yazma işleminin sonunda WritableStream
durumunun kontrol edilmesini sağlayan bir denetleyiciyi temsil eder. WritableStream
oluşturulurken temel alıcıya, üzerinde işlem yapmak için karşılık gelen bir WritableStreamDefaultController
örneği verilir. WritableStreamDefaultController
yalnızca tek bir yönteme sahip:WritableStreamDefaultController.error()
. Bu durum, ilişkili yayınla gelecekteki tüm etkileşimlerin hata vermesine neden olur.
WritableStreamDefaultController
, AbortSignal
örneğini döndüren bir signal
özelliğini de destekler. Bu özellik, gerekirse WritableStream
işleminin durdurulmasına olanak tanır.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
WritableStream()
oluşturucusunun ikinci bağımsız değişkeni de isteğe bağlıdır ve queuingStrategy
'dir.
Bu, akış için isteğe bağlı olarak bir sıralama stratejisi tanımlayan ve iki parametre alan bir nesnedir:
highWaterMark
: Bu sıralama stratejisi kullanılarak akışın en yüksek noktasını gösteren negatif olmayan bir sayı.size(chunk)
: Belirtilen parça değerinin sonlu ve negatif olmayan boyutunu hesaplayıp döndüren bir işlev. Sonuç, uygunWritableStreamDefaultWriter.desiredSize
özelliği aracılığıyla kendini gösteren geri basıncı belirlemek için kullanılır.
getWriter()
ve write()
yöntemleri
Yazılabilir bir akışa yazmak için bir yazıcıya ihtiyacınız vardır. Yazıcı, WritableStreamDefaultWriter
olur. WritableStream
arayüzünün getWriter()
yöntemi, WritableStreamDefaultWriter
öğesinin yeni bir örneğini döndürür ve akışı bu örnekle kilitler. Akış kilitliyken mevcut yazar serbest bırakılana kadar başka yazar alınamaz.
write()
arayüzünün WritableStreamDefaultWriter
yöntemi, iletilen bir veri parçasını WritableStream
ve temelindeki hedefe yazar, ardından yazma işleminin başarılı veya başarısız olduğunu belirten bir söz döndürür. "Başarı"nın ne anlama geldiğinin temel alıcıya bağlı olduğunu unutmayın. Bu, parçanın kabul edildiğini gösterebilir ancak nihai hedefine güvenli bir şekilde kaydedildiğini göstermeyebilir.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
locked
özelliği
Yazılabilir bir akışın kilitli olup olmadığını, WritableStream.locked
özelliğine erişerek kontrol edebilirsiniz.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Yazılabilir akış kod örneği
Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Okunabilir bir akışı yazılabilir bir akışa yönlendirme
Okunabilir bir akış, okunabilir akışın pipeTo()
yöntemiyle yazılabilir bir akışa yönlendirilebilir.
ReadableStream.pipeTo()
, mevcut ReadableStream
öğesini belirli bir WritableStream
öğesine yönlendirir ve yönlendirme işlemi başarıyla tamamlandığında yerine getirilen veya herhangi bir hatayla karşılaşıldığında reddedilen bir söz döndürür.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Dönüşüm akışı oluşturma
Streams API'nin TransformStream
arayüzü, dönüştürülebilir bir veri grubunu temsil eder. Yapılandırıcıyı TransformStream()
çağırarak bir dönüştürme akışı oluşturursunuz. Bu, verilen işleyicilerden bir dönüştürme akışı nesnesi oluşturup döndürür. TransformStream()
oluşturucusu, ilk bağımsız değişken olarak transformer
öğesini temsil eden isteğe bağlı bir JavaScript nesnesi kabul eder. Bu tür nesneler aşağıdaki yöntemlerden herhangi birini içerebilir:
transformer
start(controller)
: Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Bu genelliklecontroller.enqueue()
kullanılarak önek parçalarını sıraya almak için kullanılır. Bu parçalar okunabilir taraftan okunur ancak yazılabilir tarafa yapılan herhangi bir yazma işlemine bağlı değildir. Bu ilk işlem, örneğin önek parçalarını edinmek biraz çaba gerektirdiğinden eşzamansızsa işlev, başarıyı veya başarısızlığı bildirmek için bir söz döndürebilir. Reddedilen bir söz, akışta hataya neden olur. Oluşan tüm istisnalar,TransformStream()
oluşturucusu tarafından yeniden oluşturulur.transform(chunk, controller)
: Bu yöntem, yazılabilir tarafa ilk olarak yazılan yeni bir parça dönüştürülmeye hazır olduğunda çağrılır. Akış uygulaması, bu işlevin yalnızca önceki dönüştürmeler başarılı olduktan sonra çağrılacağını ve hiçbir zamanstart()
tamamlanmadan veyaflush()
çağrıldıktan sonra çağrılmayacağını garanti eder. Bu işlev, dönüştürme akışının gerçek dönüştürme işlemini gerçekleştirir. Sonuçlarıcontroller.enqueue()
kullanarak sıraya alabilir. Bu,controller.enqueue()
kaç kez çağrıldığına bağlı olarak, yazılabilir tarafa yazılan tek bir parçanın okunabilir tarafta sıfır veya birden çok parçaya neden olmasına izin verir. Dönüştürme işlemi eşzamansızsa bu işlev, dönüştürmenin başarılı veya başarısız olduğunu bildirmek için bir söz döndürebilir. Reddedilen bir söz, dönüştürme akışının hem okunabilir hem de yazılabilir taraflarında hataya neden olur.transform()
yöntemi sağlanmazsa kimlik dönüşümü kullanılır. Bu dönüşüm, yazılabilir taraftaki değişmemiş parçaları okunabilir tarafa sıraya alır.flush(controller)
: Bu yöntem, yazılabilir tarafa yazılan tüm parçalartransform()
'dan başarıyla geçerek dönüştürüldükten ve yazılabilir taraf kapatılmak üzereyken çağrılır. Bu genellikle, okunabilir taraf da kapanmadan önce sonek parçalarını okunabilir tarafa sıraya almak için kullanılır. Temizleme işlemi eşzamansızsa işlev, başarıyı veya başarısızlığı bildirmek için bir söz döndürebilir. Sonuç,stream.writable.write()
işlevinin arayanına iletilir. Ayrıca, reddedilen bir söz, akışın hem okunabilir hem de yazılabilir tarafında hataya neden olur. İstisna oluşturma, reddedilen bir sözü döndürmeyle aynı şekilde değerlendirilir.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
writableStrategy
ve readableStrategy
sıralama stratejileri
TransformStream()
oluşturucusunun ikinci ve üçüncü isteğe bağlı parametreleri writableStrategy
ve readableStrategy
sıralama stratejileridir. Bunlar sırasıyla okunabilir ve yazılabilir akış bölümlerinde belirtildiği şekilde tanımlanır.
Dönüştürme akışı kod örneği
Aşağıdaki kod örneğinde, basit bir dönüştürme akışının nasıl çalıştığı gösterilmektedir.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Okunabilir bir akışı dönüştürme akışından geçirme
ReadableStream
arayüzünün pipeThrough()
yöntemi, mevcut akışı bir dönüştürme akışından veya başka bir yazılabilir/okunabilir çiftten geçirmek için zincirlenebilir bir yol sağlar. Bir akışın yönlendirilmesi, genellikle yönlendirme süresi boyunca akışı kilitler ve diğer okuyucuların akışı kilitlemesini engeller.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Biraz yapay olan sonraki kod örneğinde, döndürülen yanıt sözünü fetch()
akış olarak tüketip metni parça parça büyük harfe çevirerek tüm metni büyük harfe çeviren bir "bağırma" sürümünün nasıl uygulanabileceği gösterilmektedir. Bu yaklaşımın avantajı, tüm belgenin indirilmesini beklemenize gerek olmamasıdır. Bu durum, büyük dosyalarla çalışırken büyük bir fark yaratabilir.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
Aşağıdaki demoda okunabilir, yazılabilir ve dönüştürülebilir akışlar gösterilmektedir. Ayrıca pipeThrough()
ve pipeTo()
boru zincirlerine örnekler de içerir ve tee()
'ı gösterir. İsteğe bağlı olarak demoyu kendi penceresinde çalıştırabilir veya kaynak kodunu görüntüleyebilirsiniz.
Tarayıcıda kullanılabilen faydalı yayınlar
Tarayıcıda yerleşik olarak bulunan birçok yararlı yayın vardır. Blob'dan kolayca ReadableStream
oluşturabilirsiniz. Blob
arayüzünün stream() yöntemi, okunduğunda blob'da bulunan verileri döndüren bir ReadableStream
döndürür. Ayrıca, File
nesnesinin belirli bir Blob
türü olduğunu ve blob'un kullanılabildiği her bağlamda kullanılabileceğini unutmayın.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
TextDecoder.decode()
ve TextEncoder.encode()
öğelerinin yayın varyantlarına sırasıyla TextDecoderStream
ve TextEncoderStream
denir.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
CompressionStream
ve DecompressionStream
dönüştürme akışlarını kullanarak dosyaları kolayca sıkıştırabilir veya açabilirsiniz. Aşağıdaki kod örneğinde, Streams spesifikasyonunu nasıl indirebileceğiniz, doğrudan tarayıcıda nasıl sıkıştırabileceğiniz (gzip) ve sıkıştırılmış dosyayı doğrudan diske nasıl yazabileceğiniz gösterilmektedir.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
File System Access API'nin
FileSystemWritableFileStream
ve deneysel fetch()
istek akışları, gerçek hayattaki yazılabilir akışlara örnek olarak verilebilir.
Serial API, hem okunabilir hem de yazılabilir akışları yoğun bir şekilde kullanır.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Son olarak, WebSocketStream
API, akışları WebSocket API ile entegre eder.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Faydalı kaynaklar
- Akış spesifikasyonu
- Eşlik eden demolar
- Streams polyfill
- 2016: Web akışlarının yılı
- Eşzamansız yineleyiciler ve oluşturucular
- Akış Görselleştirici
Teşekkür
Bu makale; Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley ve Adam Rice tarafından incelenmiştir. Jake Archibald'ın blog yayınları, akışları anlamamda çok yardımcı oldu. Kod örneklerinden bazıları GitHub kullanıcısı @bellbind'in keşiflerinden esinlenmiştir ve metinlerin bazı bölümleri MDN Web Docs on Streams'e dayanmaktadır. Streams Standardı'nın yazarları, bu spesifikasyonu yazarken muazzam bir iş çıkardı. Lokomotif resim, Ryan Lara tarafından Unsplash'te yayınlandı.