Akışlar: Ayrıntılı kılavuz

Streams API ile okunabilir, yazılabilir ve dönüştürülebilir akışları nasıl kullanacağınızı öğrenin.

Streams API, ağ üzerinden alınan veya yerel olarak herhangi bir şekilde oluşturulan veri akışlarına programatik olarak erişmenize ve bunları JavaScript ile işlemenize olanak tanır. Akış, almak, göndermek veya dönüştürmek istediğiniz bir kaynağı küçük parçalara ayırmayı ve ardından bu parçaları bit bit işlemeyi içerir. Akış, web sayfalarında gösterilecek HTML veya video gibi öğeleri alan tarayıcıların zaten yaptığı bir işlem olsa da 2015'te akışlarla fetch kullanıma sunulmadan önce bu özellik JavaScript'de kullanılamamıştı.

Daha önce, bir tür kaynağı (ör. video veya metin dosyası) işlemek istiyorsanız dosyanın tamamını indirmeniz, uygun bir biçime serileştirilmesini beklemeniz ve ardından dosyayı işlemeniz gerekiyordu. JavaScript'in akışları kullanabilmesiyle tüm bunlar değişiyor. Artık istemcide kullanılabilir hale gelir gelmez ham verileri JavaScript ile kademeli olarak işleyebilir, arabelleğe, dizeye veya blob'a gerek kalmaz. Bu, bazılarını aşağıda listelediğim çeşitli kullanım alanlarına olanak tanır:

  • Video efektleri: Okunabilir bir video akışını, efektleri gerçek zamanlı olarak uygulayan bir dönüştürme akışı üzerinden aktarma.
  • Veri sıkıştırma/sıkıştırma açma: Bir dosya akışının, seçici bir şekilde sıkıştırıldığı/sıkıştırıldığı bir dönüştürme akışı üzerinden aktarılması.
  • Resim kodunun çözülmesi: Bir HTTP yanıtı akışının, baytların kodunun bitmap verilerine dönüştürüldüğü bir dönüştürme akışı ve ardından bitmap'leri PNG'ye çeviren başka bir dönüştürme akışı üzerinden aktarılması. Bir hizmet çalışanının fetch işleyicisine yüklenirse AVIF gibi yeni resim biçimlerini şeffaf bir şekilde doldurabilirsiniz.

Tarayıcı desteği

ReadableStream ve WritableStream

Tarayıcı desteği

  • Chrome: 43.
  • Kenar: 14.
  • Firefox: 65.
  • Safari: 10.1.

Kaynak

TransformStream

Tarayıcı desteği

  • Chrome: 67.
  • Kenar: 79.
  • Firefox: 102.
  • Safari: 14.1.

Kaynak

Temel kavramlar

Çeşitli yayın türleriyle ilgili ayrıntılara geçmeden önce bazı temel kavramları açıklamak isterim.

Büyük Parça

Bir parça, bir akışa yazılan veya bir akıştan okunan tek bir veri parçasıdır. Herhangi bir türde olabilir; hatta farklı türde parçalar içerebilir. Çoğu zaman, bir veri kümesi belirli bir akış için en atomik veri birimi olmaz. Örneğin, bir bayt akışı tek baytlar yerine 16 KiB Uint8Array biriminden oluşan parçalar içerebilir.

Okunabilir akışlar

Okunabilir akış, okuyabileceğiniz bir veri kaynağını temsil eder. Diğer bir deyişle, veriler okunabilir bir akıştan çıkar. Daha açık belirtmek gerekirse, okunabilir akış, ReadableStream sınıfının bir örneğidir.

Yazılabilir akışlar

Yazılabilir akış, veri yazabileceğimiz bir hedefi temsil eder. Başka bir deyişle, veriler yazılabilir bir akışa girer. Yazılabilir akış, WritableStream sınıfının bir örneğidir.

Akışları dönüştürme

Dönüşüm akışı, bir çift akıştan oluşur: Yazılabilir tarafı olarak bilinen yazılabilir bir akış ve okunabilir tarafı olarak bilinen okunabilir bir akış. Bu durumu gerçek dünyada bir metaforla açıklamak gerekirse, bir dilden diğerine anında çeviri yapan simultane çevirmen olarak örnek verilebilir. Dönüşüm akışına özgü bir şekilde, yazılabilir tarafa yazma işlemi, okunabilir taraftan okunmaya hazır yeni verilerin sunulmasına neden olur. Daha açık belirtmek gerekirse, writable ve readable özelliğine sahip tüm nesneler dönüşüm akışı olarak kullanılabilir. Bununla birlikte, standart TransformStream sınıfı, düzgün şekilde dolanmış böyle bir çift oluşturmayı kolaylaştırır.

Boru zincirleri

Akışlar, öncelikle birbirlerine bağlantı verilerek kullanılır. Okunabilir bir akış, okunabilir akışın pipeTo() yöntemi kullanılarak doğrudan yazılabilir bir akışa bağlanabilir veya okunabilir akışın pipeThrough() yöntemi kullanılarak önce bir veya daha fazla dönüşüm akışından geçirilebilir. Bu şekilde birleştirilmiş bir akış grubuna boru zinciri denir.

Geri basınç

Bir boru zinciri oluşturulduktan sonra, parçaların içinden ne kadar hızlı akması gerektiğine dair sinyaller yayılır. Zincirdeki herhangi bir adım henüz parçaları kabul edemiyorsa boru zinciri boyunca geriye doğru bir sinyal yayılır. Bu sinyal, sonunda orijinal kaynağa bu kadar hızlı parça üretmeyi bırakması söylenene kadar devam eder. Bu akışın normalleştirilmesi sürecine karşı basınç adı verilir.

Tişört

Okunabilir bir akış, tee() yöntemi kullanılarak yan dallara ayrılabilir (büyük harfli "T" şeklinden dolayı bu şekilde adlandırılır). Bu işlem, yayını kilitler, yani artık doğrudan kullanılamaz hale getirir. Ancak bağımsız olarak kullanılabilen iki yeni yayın (dal olarak adlandırılır) oluşturur. Akışlar geri sarılamadığı veya yeniden başlatılamadığı için başlangıç noktası da önemlidir. Bu konu hakkında daha fazla bilgiyi aşağıda bulabilirsiniz.

Fetch API'ye yapılan bir çağrıdan gelen ve daha sonra çıkışı ayrılan bir dönüştürme akışı üzerinden aktarılan ve ardından ilk okunabilir akış için tarayıcıya ve ikinci okunabilir akış için hizmet çalışanı önbelleğiye gönderilen okunabilir bir akıştan oluşan bir boru zinciri şeması.
Bir boru zinciri.

Okunabilir bir yayının işleyiş şekli

Okunabilir akış, JavaScript'te temel bir kaynaktan akan bir ReadableStream nesnesi tarafından temsil edilen bir veri kaynağıdır. ReadableStream() sınıfının kurucusu, belirtilen işleyicilerden okunabilir bir akış nesnesi oluşturur ve döndürür. İki tür temel kaynak vardır:

  • Aktarıcı kaynaklar, eriştiğinizde size sürekli olarak veri aktarır. Akışa erişimi başlatmak, duraklatmak veya iptal etmek ise size bağlıdır. Örnekler arasında canlı video akışları, sunucu tarafından gönderilen etkinlikler veya WebSocket'ler yer alır.
  • Alma kaynakları, bağlandıktan sonra bu kaynaklardan açıkça veri istemenizi gerektirir. Buna örnek olarak fetch() veya XMLHttpRequest çağrıları aracılığıyla yapılan HTTP işlemleri verilebilir.

Akış verileri, parça adı verilen küçük parçalar halinde sırayla okunur. Bir akışa yerleştirilen parçaların sıraya eklenmiş olduğu söylenir. Bu, okunmaya hazır olarak bir sırada bekledikleri anlamına gelir. Dahili sıra, henüz okunmamış parçaları takip eder.

Sıraya ekleme stratejisi, bir akışın dahili sıranın durumuna göre karşı basıncı nasıl işaret edeceğini belirleyen bir nesnedir. Sıralama stratejisi her bir parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu maksimum değer olarak bilinen belirli bir sayıyla karşılaştırır.

Akıştaki parçalar bir okuyucu tarafından okunur. Bu okuyucu, verileri birer parça halinde alır ve üzerinde istediğiniz türde işlem yapmanıza olanak tanır. Okuyucu ve onunla birlikte gelen diğer işleme kodu tüketici olarak adlandırılır.

Bu bağlamda bir sonraki yapıya denetleyici denir. Her okunabilir akış, adından da anlaşılacağı gibi akışı kontrol etmenizi sağlayan ilişkili bir denetleyiciye sahiptir.

Bir akışı aynı anda yalnızca bir okuyucu okuyabilir. Bir okuyucu, oluşturulup okumaya başladığında (yani etkin bir okuyucu haline geldiğinde) akışa kilitlenir. Aktardığınız metni başka bir okuyucunun okumasını istiyorsanız genellikle başka bir işlem yapmadan önce ilk okuyucuyu bırakmanız gerekir (ancak aktarmaları bölebilirsiniz).

Okunabilir bir akış oluşturma

Oluşturucusunu çağırarak okunabilir bir akış oluşturursunuzReadableStream(). Oluşturucu, oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntem ve özelliklere sahip bir nesneyi temsil eden isteğe bağlı bir bağımsız değişkene underlyingSource sahiptir.

underlyingSource

Bu işlem için geliştirici tarafından tanımlanan aşağıdaki isteğe bağlı yöntemler kullanılabilir:

  • start(controller): Nesne oluşturulduğunda hemen çağrılır. Yöntem, akış kaynağına erişebilir ve akış işlevini ayarlamak için gereken diğer her şeyi yapabilir. Bu işlem eşzamansız olarak yapılacaksa yöntem, başarı veya başarısızlığı bildirmek için bir promise döndürebilir. Bu yönteme iletilen controller parametresi bir ReadableStreamDefaultController bağımsız değişkenidir.
  • pull(controller): Daha fazla parça getirilirken yayını kontrol etmek için kullanılabilir. Akıştaki dahili parça kuyruğu dolu olmadığı sürece ve kuyruk en yüksek işaretine ulaşana kadar tekrar tekrar çağrılır. pull() çağrılmasının sonucu bir sözse söz yerine getirilene kadar pull() tekrar çağrılmaz. Sözleşme reddedilirse akışta hata oluşur.
  • cancel(reason): Yayın tüketicisi, yayını iptal ettiğinde çağrılır.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController aşağıdaki yöntemleri destekler:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Benzer şekilde, ReadableStream() kurucusunun ikinci bağımsız değişkeni queuingStrategy şeklindedir. Bu, isteğe bağlı olarak akış için iki parametre alan bir sıraya ekleme stratejisini tanımlayan bir nesnedir:

  • highWaterMark: Bu sıraya ekleme stratejisinin kullanıldığı yayının en yüksek noktasını gösteren sıfırdan büyük bir sayı.
  • size(chunk): Belirtilen parça değerinin sonlu ve negatif olmayan boyutunu hesaplayıp döndüren bir işlev. Sonuç, geri basıncı belirlemek için kullanılır ve uygun ReadableStreamDefaultController.desiredSize mülkü aracılığıyla gösterilir. Ayrıca, temel kaynağın pull() yönteminin ne zaman çağrılacağını da belirler.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader() ve read() yöntemleri

Okunabilir bir akıştan okumak için bir okuyucuya ihtiyacınız vardır. Bu okuyucu, ReadableStreamDefaultReader olacaktır. ReadableStream arayüzünün getReader() yöntemi bir okuyucu oluşturur ve akışı bu okuyucuya kilitler. Akış kilitliyken bu okuyucu serbest bırakılana kadar başka okuyucu edinilemez.

ReadableStreamDefaultReader arayüzünün read() yöntemi, akışın dahili sırasındaki bir sonraki parçaya erişim sağlayan bir taahhüt döndürür. Akış durumuna bağlı olarak bir sonuçla isteği yerine getirir veya reddeder. Olasılıklar şunlardır:

  • Bir parça mevcutsa söz,
    { value: chunk, done: false } biçiminde bir nesneyle yerine getirilir.
  • Akış kapatılırsa söz,
    { value: undefined, done: true } biçiminde bir nesneyle yerine getirilir.
  • Akışta hata oluşursa söz konusu hata ile birlikte söz reddedilir.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked mülkü

Okunabilir bir yayının kilitli olup olmadığını kontrol etmek için ReadableStream.locked özelliğine erişebilirsiniz.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Okunabilir akış kod örnekleri

Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir. Önce, underlyingSource bağımsız değişkeninde (yani TimestampSource sınıfı) bir start() yöntemi tanımlayan bir ReadableStream oluşturursunuz. Bu yöntem, yayının controller'e on saniye boyunca her saniye bir zaman damgası enqueue() göndermesini söyler. Son olarak da akış için close() komutunu gönderir. Bu akışı, getReader() yöntemiyle bir okuyucu oluşturup akış done olana kadar read() çağrısı yaparak tüketirsiniz.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Eşzamansız iterasyon

Her read() döngü iterasyonunda akış done olup olmadığını kontrol etmek en uygun API olmayabilir. Neyse ki yakında bunu yapmanın daha iyi bir yolu olacak: eşzamansız iterasyon.

for await (const chunk of stream) {
  console.log(chunk);
}

Eş zamansız iterasyonu bugün kullanmak için bir geçici çözüm, davranışı bir polyfill ile uygulamaktır.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Okunabilir bir akış oluşturma

ReadableStream arayüzünün tee() yöntemi, mevcut okunabilir akışı ayırarak yeni ReadableStream örnekleri olarak iki dal içeren iki öğeli bir dizi döndürür. Bu sayede iki okuyucu aynı anda bir yayını okuyabilir. Örneğin, sunucudan bir yanıt almak ve bunu tarayıcıya aktarmak, aynı zamanda hizmet çalışanı önbelleğine aktarmak istiyorsanız bunu bir hizmet çalışanında yapabilirsiniz. Yanıt gövdesi birden fazla kez kullanılamayacağından bunu yapmak için iki kopyaya ihtiyacınız vardır. Ardından, akışı iptal etmek için ortaya çıkan her iki dalı da iptal etmeniz gerekir. Bir yayını başlattığınızda yayın genellikle kilitlenir ve diğer okuyucular tarafından kilitlenmesi engellenir.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Okunabilir bayt akışları

Baytları temsil eden akışlar için, baytları verimli bir şekilde işlemek amacıyla (özellikle kopyaları en aza indirerek) okunabilir akışın genişletilmiş bir sürümü sağlanır. Bayt akışları, kendi arabelleğinizi getirme (BYOB) okuyucularının edinilmesine olanak tanır. Varsayılan uygulama, WebSocket'ler söz konusu olduğunda dize veya dizi arabellekleri gibi çeşitli farklı çıkışlar verebilir. Byte akışları ise bayt çıkışını garanti eder. Ayrıca, KOBİ okuyucularının kararlılıkla ilgili avantajları vardır. Bunun nedeni, bir arabelleğin ayrılması durumunda aynı arabelleğe iki kez yazılmayacağının garanti edilebilmesi ve böylece yarış koşullarının önlenebilmesidir. BYOB okuyucuları, tarayıcı tamponları yeniden kullanabileceği için atık toplama çalıştırması gereken süreyi azaltabilir.

Okunabilir bir bayt akışı oluşturma

ReadableStream() oluşturucusuna ek bir type parametresi ileterek okunabilir bir bayt akışı oluşturabilirsiniz.

new ReadableStream({ type: 'bytes' });

underlyingSource

Okunabilir bir bayt akışının temel kaynağına, üzerinde işlem yapmak için bir ReadableByteStreamController verilir. Bu ReadableByteStreamController.enqueue() yöntemi, değeri ArrayBufferView olan bir chunk bağımsız değişkeni alır. ReadableByteStreamController.byobRequest özelliği, geçerli BYOB pull isteğini döndürür. İstek yoksa boş değer döndürür. Son olarak, ReadableByteStreamController.desiredSize özelliği, kontrollü akışın dahili sırasını doldurmak için istenen boyutu döndürür.

queuingStrategy

Benzer şekilde, ReadableStream() kurucusunun ikinci bağımsız değişkeni queuingStrategy şeklindedir. İsteğe bağlı olarak akış için bir sıra stratejisi tanımlayan ve bir parametre alan bir nesnedir:

  • highWaterMark: Bu sıraya ekleme stratejisini kullanan yayının en yüksek noktasını gösteren sıfırdan büyük bir bayt sayısı. Bu, geri basıncı belirlemek için kullanılır ve uygun ReadableByteStreamController.desiredSize mülkü aracılığıyla gösterilir. Ayrıca, temel kaynağın pull() yönteminin ne zaman çağrılacağını da belirler.

getReader() ve read() yöntemleri

Ardından, mode parametresini uygun şekilde ayarlayarak ReadableStreamBYOBReader'e erişebilirsiniz: ReadableStream.getReader({ mode: "byob" }). Bu sayede, kopyalardan kaçınmak için arabellek ayırma üzerinde daha hassas kontrol sağlanır. Bayt akışından okumak için ReadableStreamBYOBReader.read(view) işlevini çağırmanız gerekir. Burada view bir ArrayBufferView bağımsız değişkenidir.

Okunabilir bayt akışı kod örneği

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Aşağıdaki işlev, rastgele oluşturulmuş bir dizinin sıfır kopyayla verimli bir şekilde okunmasına olanak tanıyan okunabilir bayt akışları döndürür. Önceden belirlenmiş 1.024 parça boyutunu kullanmak yerine, tam kontrol sağlamak için geliştirici tarafından sağlanan arabelleği doldurmaya çalışır.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Yazılabilir akışların işleyiş şekli

Yazılabilir akış, veri yazabileceğiniz bir hedeftir. JavaScript'te WritableStream nesnesiyle gösterilir. Bu, alttaki havuzun, yani ham verilerin yazıldığı daha düşük seviyeli bir G/Ç havuzunun üst kısmı üzerinde bir soyutlama görevi görür.

Veriler, tek seferde bir parça olacak şekilde bir yazar aracılığıyla akışa yazılır. Bir parça, tıpkı bir okuyucudaki parçalar gibi birçok şekilde olabilir. Yazmaya hazır parçalar oluşturmak için istediğiniz kodu kullanabilirsiniz. Yazıcı ve ilişkili koda üretici denir.

Bir yazar oluşturulduğunda ve bir akışa yazmaya başladığında (etkin yazar), akışa kilitlendiği söylenir. Yazılabilir bir akışa aynı anda yalnızca bir yazar yazabilir. Başka bir yazarın akışınıza yazmaya başlamasını istiyorsanız genellikle yayını yayınlamanız ve ardından başka bir yazar eklemeniz gerekir.

Dahili bir sıra, akışa yazılmış ancak henüz temel alıcı tarafından işlenmemiş parçaları izler.

Sıraya ekleme stratejisi, bir akışın dahili sıranın durumuna göre karşı basıncı nasıl işaret edeceğini belirleyen bir nesnedir. Sıralama stratejisi her bir parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu maksimum değer olarak bilinen belirli bir sayıyla karşılaştırır.

Nihai yapıya denetleyici adı verilir. Her yazılabilir akışın, akışı kontrol etmenize (örneğin, iptal etmenize) olanak tanıyan ilişkili bir denetleyicisi vardır.

Yazılabilir akış oluşturma

Streams API'nin WritableStream arayüzü, aktarma noktası olarak bilinen bir hedefe aktarma verileri yazmak için standart bir soyutlama sağlar. Bu nesne, yerleşik karşı basınç ve sıraya ekleme özelliğine sahiptir. Oluşturucusunu WritableStream() çağırarak yazılabilir bir akış oluşturursunuz. Oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntem ve özelliklere sahip bir nesneyi temsil eden isteğe bağlı bir underlyingSink parametresi vardır.

underlyingSink

underlyingSink, geliştirici tarafından tanımlanan aşağıdaki isteğe bağlı yöntemleri içerebilir. Bazı yöntemlere iletilen controller parametresi WritableStreamDefaultController şeklindedir.

  • start(controller): Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Bu yöntemin içeriği, temel lavaboya erişmeyi amaçlamalıdır. Bu işlem eşzamansız olarak yapılacaksa başarı veya başarısızlığı bildiren bir promise döndürülebilir.
  • write(chunk, controller): Bu yöntem, yeni bir veri parçası (chunk parametresinde belirtilir) temel alıcıya yazılmaya hazır olduğunda çağrılır. Yazma işleminin başarılı veya başarısız olduğunu belirtmek için bir promise döndürebilir. Bu yöntem yalnızca önceki yazma işlemleri başarılı olduktan sonra çağrılır ve hiçbir zaman akış kapatıldıktan veya iptal edildikten sonra çağrılmaz.
  • close(controller): Uygulama, akışa parçalar yazmayı tamamladığını bildiriyorsa bu yöntem çağrılır. İçerikler, temeldeki havuza yazma işlemlerini tamamlamak ve bu havuza erişimi serbest bırakmak için gerekeni yapmalıdır. Bu süreç eşzamansızsa başarıya veya başarısızlığa işaret eden bir vaat döndürebilir. Bu yöntem yalnızca sıraya alınmış tüm yazma işlemleri başarılı olduktan sonra çağrılır.
  • abort(reason): Bu yöntem, uygulama akışı aniden kapatmak ve işlemi hatalı duruma getirmek istediğini bildiriyorsa çağrılır. close() gibi bekletilen kaynakları temizleyebilir ancak yazma işlemleri sıraya alınmış olsa bile abort() çağrılır. Bu parçalar çöpe atılır. Bu işlem eşzamanlı değilse başarı veya başarısızlık sinyali vermek için bir promise döndürebilir. reason parametresi, aktarımın neden iptal edildiğini açıklayan bir DOMString içerir.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API'nin WritableStreamDefaultController arayüzü, yazma işlemi için daha fazla parça gönderilirken veya yazma işleminin sonunda, kurulum sırasında WritableStream durumunun kontrol edilmesine olanak tanıyan bir denetleyiciyi temsil eder. WritableStream oluşturulurken temel havuza, manipüle edilmesi için karşılık gelen bir WritableStreamDefaultController örneği verilir. WritableStreamDefaultController için tek bir yöntem vardır: WritableStreamDefaultController.error() bu yöntem, ilişkilendirilmiş akışla gelecekteki etkileşimlerde hataya neden olur. WritableStreamDefaultController, AbortSignal örneği döndüren bir signal özelliğini de destekler. Bu özellik, gerektiğinde WritableStream işleminin durdurulmasına olanak tanır.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() kurucusunun ikinci, yine isteğe bağlı bağımsız değişkeni queuingStrategy'dur. İsteğe bağlı olarak akış için bir sıra stratejisi tanımlayan bir nesnedir ve iki parametre alır:

  • highWaterMark: Bu sıraya ekleme stratejisinin kullanıldığı yayının en yüksek noktasını gösteren sıfırdan büyük bir sayı.
  • size(chunk): Belirtilen parça değerinin sonlu ve negatif olmayan boyutunu hesaplayıp döndüren bir işlev. Sonuç, geri basıncı belirlemek için kullanılır ve uygun WritableStreamDefaultWriter.desiredSize mülkü aracılığıyla gösterilir.

getWriter() ve write() yöntemleri

Yazılabilir bir akışa yazmak için yazar gerekir. Bu da WritableStreamDefaultWriter olacaktır. WritableStream arayüzünün getWriter() yöntemi, yeni bir WritableStreamDefaultWriter örneği döndürür ve akışı bu örneğe kilitler. Akış kilitliyken mevcut yazar bırakılana kadar başka yazar edinilemez.

WritableStreamDefaultWriter arayüzünün write() yöntemi, geçirilen bir veri parçasını WritableStream ile bunun temel havuzuna yazar, ardından yazma işleminin başarılı veya başarısız olduğunu belirten bir çözüm döndürür. "Başarı"nın ne anlama geldiğine bağlı olduğunu unutmayın. Bu, parçanın kabul edildiğini gösterebilir ancak nihai hedefine güvenli bir şekilde kaydedildiğini göstermeyebilir.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked mülkü

Yazılabilir bir aktarımın kilitli olup olmadığını kontrol etmek için WritableStream.locked mülküne erişebilirsiniz.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Yazılabilir akış kod örneği

Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Okunabilir bir akışı, yazılabilir bir akışa aktarma

Okunabilir bir akış, okunabilir akışın pipeTo() yöntemi aracılığıyla yazılabilir bir akışa aktarılabilir. ReadableStream.pipeTo(), mevcut ReadableStream öğesini belirli bir WritableStream öğesine aktarır ve aktarma işlemi başarıyla tamamlandığında yerine getirilen veya herhangi bir hatayla karşılaşıldığında reddedilen bir promise döndürür.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Dönüşüm akışı oluşturma

Streams API'nin TransformStream arayüzü, dönüştürülebilir bir veri grubunu temsil eder. Oluşturucu TransformStream()'yi çağırarak bir dönüştürme akışı oluşturursunuz. Bu oluşturucu, belirtilen işleyicilerden bir dönüştürme akışı nesnesi oluşturup döndürür. TransformStream() kurucusu, ilk bağımsız değişkeni olarak transformer'yi temsil eden isteğe bağlı bir JavaScript nesnesi kabul eder. Bu tür nesneler, aşağıdaki yöntemlerin herhangi birini içerebilir:

transformer

  • start(controller): Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Genellikle controller.enqueue() kullanılarak ön ek parçalarını sıraya eklemek için kullanılır. Bu parçalar okunabilir taraftan okunur ancak yazılabilir tarafa yapılan yazma işlemlerine bağlı değildir. Bu ilk işlem, örneğin ön ek parçalarını elde etmek biraz çaba gerektirdiği için eşzamanlı değilse işlev, başarı veya başarısızlığı bildirmek için bir söz döndürebilir. Reddedilen bir söz, aktarımda hata oluşturur. Atılan tüm istisnalar, TransformStream() oluşturucusu tarafından yeniden atanır.
  • transform(chunk, controller): Bu yöntem, orijinal olarak yazılabilir tarafa yazılan yeni bir parça dönüştürülmeye hazır olduğunda çağrılır. Akış uygulaması, bu işlevin yalnızca önceki dönüştürme işlemleri başarılı olduktan sonra çağrılacağını ve hiçbir zaman start() tamamlanmadan önce veya flush() çağrıldıktan sonra çağrılmayacağını garanti eder. Bu işlev, dönüştürme akışının gerçek dönüştürme işlemini gerçekleştirir. Sonuçları controller.enqueue() kullanarak sıraya ekleyebilir. Bu, yazılabilir tarafa yazılan tek bir parçanın, controller.enqueue()'ün kaç kez çağrıldığına bağlı olarak okunabilir tarafta sıfır veya birden fazla parçayla sonuçlanmasını sağlar. Dönüşüm işlemi eşzamanlı değilse bu işlev, dönüşümün başarılı veya başarısız olduğunu bildiren bir promise döndürebilir. Reddedilen bir söz, dönüştürme akışının hem okunabilir hem de yazılabilir tarafında hata verir. Hiçbir transform() yöntemi sağlanmazsa kimlik dönüştürme kullanılır. Bu yöntem, yazılabilir taraftan okunabilir tarafa değişmeden parçaları ekler.
  • flush(controller): Bu yöntem, yazılabilir tarafa yazılan tüm parçalar transform() üzerinden başarıyla geçirilerek dönüştürüldükten ve yazılabilir taraf kapatılmak üzereyken çağrılır. Bu genellikle, son ek parçalarını okunabilir tarafa eklemek için kullanılır. Boşaltma işlemi eşzamanlı değilse işlev, başarı veya başarısızlığı belirtmek için bir promise döndürebilir. Sonuç, stream.writable.write() çağrısını yapan kullanıcıya iletilir. Ayrıca, reddedilen bir promise, aktarımın hem okunabilir hem de yazılabilir tarafında hata oluşturur. İstisna atma işlemi, reddedilen bir promise döndürmekle aynı şekilde değerlendirilir.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategy ve readableStrategy sıra stratejileri

TransformStream() oluşturucunun isteğe bağlı ikinci ve üçüncü parametreleri, isteğe bağlı writableStrategy ve readableStrategy sıraya alma stratejileridir. Bunlar sırasıyla readable ve writable akış bölümlerinde belirtildiği şekilde tanımlanır.

Dönüşüm akışı kod örneği

Aşağıdaki kod örneğinde, basit bir dönüştürme akışının işleyişi gösterilmektedir.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Okunabilir bir akışı dönüşüm akışı üzerinden ardışık olarak oluşturma

ReadableStream arayüzünün pipeThrough() yöntemi, mevcut akışı bir dönüştürme akışı veya başka bir yazılabilir/okunabilir çift üzerinden aktarmanın zincirlenebilir bir yolunu sağlar. Bir akışı boruya aktarmak genellikle akışı boru boyunca kilitler ve diğer okuyucuların kilitlemesini engeller.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Bir sonraki kod örneğinde (biraz abartılı) döndürülen yanıt vaadini bir akış olarak ve büyük/küçük harf ile parçayı kullanarak tüm metni büyük hale getiren fetch() ürününün "sesli" sürümünü nasıl uygulayabileceğinizi görebilirsiniz. Bu yaklaşımın avantajı, dokümanın tamamının indirilmesini beklemeniz gerekmemesidir. Bu, büyük dosyalarla çalışırken büyük bir fark yaratabilir.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

Aşağıdaki demoda, okunabilir, yazılabilir ve dönüştürme akışları gösterilmektedir. Ayrıca pipeThrough() ve pipeTo() boru zincirlerine dair örnekler içerir ve tee()'yi gösterir. İsterseniz demoyu kendi penceresinde çalıştırabilir veya kaynak kodunu görüntüleyebilirsiniz.

Tarayıcıda kullanılabilen faydalı akışlar

Doğrudan tarayıcıda yerleşik olarak bulunan çok sayıda kullanışlı akış vardır. Bir blob'dan kolayca ReadableStream oluşturabilirsiniz. Blob arayüzünün stream() yöntemi, okuma sonrasında blob içinde yer alan verileri döndüren bir ReadableStream döndürür. Bir File nesnesinin belirli bir Blob türü olduğunu ve bir blobun yapabileceği herhangi bir bağlamda kullanılabileceğini de unutmayın.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode() ve TextEncoder.encode()'un yayın varyantları sırasıyla TextDecoderStream ve TextEncoderStream olarak adlandırılır.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Sırasıyla CompressionStream ve DecompressionStream dönüştürme akışlarıyla dosyaları sıkıştırmak veya sıkıştırılmış dosyaları açmak kolaydır. Aşağıdaki kod örneğinde, Streams spesifikasyonunu nasıl indireceğiniz, doğrudan tarayıcıda nasıl sıkıştıracağınız (gzip) ve sıkıştırılmış dosyayı doğrudan diske nasıl yazacağınız gösterilmektedir.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API'nin FileSystemWritableFileStream ve deneysel fetch() istek akışları, vahşi yaşamdaki yazılabilir akışlara örnektir.

Serial API, hem okunabilir hem de yazılabilir akışları yoğun bir şekilde kullanır.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Son olarak WebSocketStream API, akışları WebSocket API ile entegre eder.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Faydalı kaynaklar

Teşekkür ederiz

Bu makale, Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley ve Adam Rice tarafından incelendi. Jake Archibald'ın blog yayınları, yayınları anlamama çok yardımcı oldu. Kod örneklerinin bazıları GitHub kullanıcısı @bellbind'in keşiflerinden esinlenmiştir ve metnin bazı bölümleri Akışlar ile ilgili MDN Web Dokümanları'na dayanır. Akışlar Standardı'nın yazarları bu spesifikasyonu yazarken çok iyi bir iş çıkarmış. Unsplash'tan Ryan Lara tarafından oluşturulan lokomotif resim.