Akışlar: Ayrıntılı kılavuz

Streams API ile okunabilir, yazılabilir ve dönüştürülebilir akışları nasıl kullanacağınızı öğrenin.

Streams API, ağ üzerinden alınan veya yerel olarak herhangi bir yöntemle oluşturulan veri akışlarına programatik olarak erişmenize ve bunları JavaScript ile işlemenize olanak tanır. Akış; almak, göndermek veya dönüştürmek istediğiniz kaynağın küçük parçalara ayrılmasını ve ardından bu parçaların parça parça işlenmesini içerir. Web sayfalarında gösterilecek HTML veya videolar gibi öğeler alırken tarayıcıların yaptığı bir işlem olsa da bu özellik, akış içeren fetch özelliği 2015'te kullanıma sunulmadan önce JavaScript'te kullanılabilir durumda değildi.

Önceden, bir tür kaynağı (video, metin dosyası vb.) işlemek istediğinizde, dosyanın tamamını indirmeniz, uygun bir biçimde seri haline getirilmesini beklemeniz ve ardından işlemeniz gerekiyordu. Akışlar JavaScript'in kullanılabildiğinde bu durum değişir. Artık ham verileri istemcide kullanıma sunulduğu kadar kısa sürede ve arabellek, dize veya blob oluşturmaya gerek kalmadan JavaScript ile kademeli olarak işleyebilirsiniz. Bu, bir dizi kullanım alanının kilidini açar. Bunlardan bazılarını aşağıda listeledik:

  • Video efektleri: Okunabilir bir video akışını, gerçek zamanlı olarak efekt uygulayan bir dönüşüm akışı üzerinden bağlama.
  • Veri (açma): Bir dosya akışını seçerek sıkıştıran (açma) bir dönüşüm akışı üzerinden bağlama.
  • Resim kodu çözme: Baytların kodunu bit eşlem verilerine çözen bir dönüşüm akışı ve ardından bit eşlemleri PNG'lere dönüştüren başka bir dönüştürme akışı aracılığıyla bir HTTP yanıt akışı bağlama. Bir Service Worker'ın fetch işleyicisinin içine yüklenirse AVIF gibi yeni resim biçimlerini şeffaf bir şekilde çoklu doldurabilmenizi sağlar.

Tarayıcı desteği

ReadableStream ve WritableStream

Tarayıcı Desteği

  • 43
  • 14
  • 65
  • 10.1

Kaynak

TransformStream

Tarayıcı Desteği

  • 67
  • 79
  • 102
  • 14.1

Kaynak

Temel kavramlar

Çeşitli akış türlerinin ayrıntılarına geçmeden önce bazı temel kavramlardan bahsetmek istiyorum.

Büyük Parça

Yığın, bir akışa yazılan veya bir akıştan okunan tek bir veri parçasıdır. Bu akış herhangi bir türde olabilir. Akışlar, farklı türlerde parçalar bile içerebilir. Çoğu zaman yığın, belirli bir akış için en atomik veri birimi olmaz. Örneğin, bir bayt akışı, tek bayt yerine 16 KiB Uint8Array biriminden oluşan parçalar içerebilir.

Okunabilir akışlar

Okunabilir bir akış, veri kaynağını okuyabildiğinizi gösterir. Başka bir deyişle, veriler okunabilir bir akıştan çıkar. Somut olarak, okunabilir bir akış ReadableStream sınıfının bir örneğidir.

Yazılabilir akışlar

Yazılabilir akış, veriler için yazabileceğiniz bir hedefi temsil eder. Başka bir deyişle, veriler yazılabilir bir akışa aktarılır. Yazılabilir bir akış somut olarak, WritableStream sınıfının bir örneğidir.

Akışları dönüştürme

Bir dönüşüm akışı, bir akış çiftinden oluşur: Yazılabilir tarafı olarak bilinen yazılabilir bir akış ve okunabilir tarafı olarak bilinen okunabilir bir akış. Bunun için gerçek dünyadaki bir metafor, anında bir dilden diğerine çeviri yapan simultane çevirmen olabilir. Dönüşüm akışına özel bir şekilde, yazılabilir tarafa yazmak yeni verilerin okunabilir taraftan okunmak üzere hazırlanmasına neden olur. Özetle, writable özelliği ve readable özelliğine sahip herhangi bir nesne dönüşüm akışı olarak hizmet verebilir. Bununla birlikte, standart TransformStream sınıfı düzgün bir şekilde birbirine dolanmış böyle bir çift oluşturmayı kolaylaştırır.

Boru zincirleri

Akışlar temel olarak birbirine bağlama özelliğiyle kullanılır. Okunabilir bir akış, okunabilir akışın pipeTo() yöntemi kullanılarak doğrudan yazılabilir bir akışa bağlanabilir veya okunabilir akışın pipeThrough() yöntemi kullanılarak önce bir veya daha fazla dönüşüm akışından geçirilebilir. Bu şekilde birbirine bağlanan bir akış kümesi ardışık düzen zinciri olarak adlandırılır.

Ters Basınç

Bir boru zinciri oluşturulduktan sonra, parçaların içinden ne kadar hızlı akması gerektiğiyle ilgili sinyaller yayar. Zincirdeki herhangi bir adım henüz parçaları kabul edemiyorsa bu parça, boru zincirinden geriye doğru bir sinyal yayar ve en sonunda orijinal kaynağa parçalar oluşturmayı o kadar hızlı bir şekilde durdurması söyleninceye kadar devam eder. Bu akışı normalleştirme sürecine karşı basınç denir.

Diş teelemesi

Okunabilir bir akış, tee() yöntemi kullanılarak "T" harfinin şekline göre isimlendirilebilir. Bu işlem akışı kilitleyerek artık doğrudan kullanılamaz hale getirir ancak bağımsız olarak kullanılabilen dallar adı verilen iki yeni akış oluşturur. Yayınlar geri sarılamadığı veya yeniden başlatılamayacağı için başlangıç vuruşu da önem taşır. Bu konu hakkında daha sonra bilgi vereceğiz.

Getirme API'sine yapılan bir çağrıdan gelen okunabilir bir akışın oluşturduğu bir ardışık düzen zinciri. Daha sonra, çıkışı bir dönüşüm akışından geçirilir ve ardından sonuç olarak ortaya çıkan ilk akış için tarayıcıya ve ikinci sonuç için sunulan okunabilir akış için Service Worker önbelleğine gönderilir.
Bora zinciri.

Okunabilir bir akışın işleyişi

Okunabilir bir akış, JavaScript'te temel bir kaynaktan gelen ReadableStream nesnesi tarafından temsil edilen bir veri kaynağıdır. ReadableStream() oluşturucu, belirtilen işleyicilerden okunabilir bir akış nesnesi oluşturur ve döndürür. İki tür temel kaynak vardır:

  • Eriştiğinizde aktarılan kaynaklar size sürekli olarak veri aktarır. Akışa erişimi başlatmak, duraklatmak veya iptal etmek size bağlıdır. Örnekler arasında canlı video akışları, sunucu tarafından gönderilen etkinlikler veya WebSockets yer alır.
  • Pull kaynakları, bağlandıktan sonra bunlardan açıkça veri istemenizi gerektirir. fetch() veya XMLHttpRequest çağrıları aracılığıyla HTTP işlemleri buna örnek olarak gösterilebilir.

Akış verileri, parça adı verilen küçük parçalar halinde sırayla okunur. Akışa yerleştirilen parçaların sıralandığı söylenir. Yani okunmak için bir sırada bekliyorlar. Dahili sıra, henüz okunmamış olan parçaları takip eder.

Sıralama stratejisi, bir akışın dahili sırasının durumuna göre geri basınç sinyalini nasıl vermesi gerektiğini belirleyen nesnedir. Sıraya ekleme stratejisi, her parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu, yüksek su işareti olarak bilinen belirli bir sayıyla karşılaştırır.

Akış içindeki parçalar bir okuyucu tarafından okunur. Bu okuyucu, verileri tek seferde bir parça olarak alarak üzerinde istediğiniz işlemi gerçekleştirmenize imkan tanır. Okuyucu ve onunla birlikte gelen diğer işleme kodu tüketici olarak adlandırılır.

Bu bağlamdaki bir sonraki yapı denetleyici olarak adlandırılır. Okunabilir her akışın, adından da anlaşılacağı gibi akışı kontrol etmenize olanak tanıyan ilişkili bir denetleyicisi vardır.

Bir akışı aynı anda yalnızca bir okuyucu okuyabilir. Bir okuyucu oluşturulduğunda ve bir akışı okumaya başladığında (yani etkin okuyucu olduğunda) akışa kilitlenir. Yayınınızı başka bir okuyucunun okumayı devralmasını istiyorsanız genellikle başka bir şey yapmadan önce ilk okuyucuyu yayınlamanız gerekir (ancak akış başlatma edebilirsiniz).

Okunabilir bir akış oluşturma

Oluşturucuyu ReadableStream() çağırarak okunabilir bir akış oluşturursunuz. Oluşturucu, isteğe bağlı bir bağımsız değişkene sahiptir: underlyingSource. Bu bağımsız değişken, oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntem ve özelliklere sahip bir nesneyi temsil eder.

underlyingSource

Bu işlev için geliştirici tarafından tanımlanan isteğe bağlı yöntemler kullanılabilir:

  • start(controller): Nesne oluşturulduğunda hemen çağrılır. Yöntem, akış kaynağına erişebilir ve akış işlevini ayarlamak için gereken diğer her şeyi yapabilir. Bu işlem eşzamansız olarak yapılacaksa yöntem, başarı veya başarısızlığı işaret etme sözü verebilir. Bu yönteme geçirilen controller parametresi ReadableStreamDefaultController değeridir.
  • pull(controller): Daha fazla parça getirildikçe akışı kontrol etmek için kullanılabilir. Akışın dahili parçalardan oluşan sırası dolu olmadığı sürece, sıra en üst seviyeye ulaşana kadar tekrar tekrar çağrılır. pull() çağrısının sonucu bir taahhütse pull() sözü edilene kadar tekrar çağrılmaz. Vaat reddedilirse akışta hata oluşur.
  • cancel(reason): Akış tüketicisi akışı iptal ettiğinde çağrılır.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController aşağıdaki yöntemleri destekler:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Aynı şekilde isteğe bağlı olarak ReadableStream() kurucusunun ikinci bağımsız değişkeni queuingStrategy bağımsız değişkenidir. Bu, akış için isteğe bağlı olarak bir sıraya koyma stratejisi tanımlayan ve iki parametre alan bir nesnedir:

  • highWaterMark: Bu sıralama stratejisini kullanan akışın yüksek su işaretini belirten negatif olmayan bir sayı.
  • size(chunk): Belirli bir parçanın negatif olmayan sonlu boyutunu hesaplayıp döndüren bir işlev. Sonuç, uygun ReadableStreamDefaultController.desiredSize özelliği üzerinden hissedilen, karşı baskıyı belirlemek için kullanılır. Ayrıca, temel kaynağın pull() yönteminin ne zaman çağrıldığını da yönetir.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader() ve read() yöntemleri

Okunabilir bir akıştan okumak için ReadableStreamDefaultReader boyutunda okuyucuya ihtiyacınız vardır. ReadableStream arayüzünün getReader() yöntemi, bir okuyucu oluşturur ve akışı buna kilitler. Akış kilitli durumdayken, bu kitap yayınlanana kadar başka okuyucu edinilemez.

ReadableStreamDefaultReader arayüzünün read() yöntemi, akışın dahili sırasındaki bir sonraki parçaya erişim sağlama sözü döndürür. Akışın durumuna bağlı olarak bir sonuçla isteği karşılar veya reddeder. Farklı olasılıklar şunlardır:

  • Parça varsa sözü şu biçimdeki bir nesne ile yerine getirilecektir:
    { value: chunk, done: false }.
  • Akış kapatılırsa taahhüt şu biçimde bir nesne ile yerine getirilir:
    { value: undefined, done: true }.
  • Akışta hata oluşursa söz konusu hatayla birlikte reddedilir.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked özelliği

Okunabilir bir akışın kilitli olup olmadığını kontrol etmek için ReadableStream.locked mülküne erişebilirsiniz.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Okunabilir akış kodu örnekleri

Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir. İlk olarak, underlyingSource bağımsız değişkeninde (yani TimestampSource sınıfı) bir start() yöntemini tanımlayan bir ReadableStream oluşturursunuz. Bu yöntem, on saniyede bir, akışın controller öğesine enqueue() zaman damgası bildirir. Son olarak da kumandaya akışı close() talimatı verir. getReader() yöntemiyle bir okuyucu oluşturup akış done olana kadar read() yöntemini çağırarak bu akışı kullanırsınız.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Eşzamansız yineleme

Akışın done olup olmadığını her read() döngüsü yinelemesini kontrol etmek en uygun API olmayabilir. Neyse ki yakında bunu yapmanın daha iyi bir yolu olacak: eşzamansız yineleme.

for await (const chunk of stream) {
  console.log(chunk);
}

Günümüzde eşzamansız yinelemeyi kullanmaya yönelik geçici bir çözüm, davranışı bir çoklu dolgu ile uygulamaktır.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Okunabilir bir akış videosu sunma

ReadableStream arayüzünün tee() yöntemi mevcut okunabilir akışı devreye sokar ve sonuçta ortaya çıkan iki dalı içeren iki öğeli bir diziyi yeni ReadableStream örnekleri olarak döndürür. Bu, iki okuyucunun bir akışı aynı anda okumasına olanak tanır. Örneğin, sunucudan bir yanıt alıp tarayıcıya aktarmak, ancak aynı zamanda hizmeti Service Worker önbelleğine de aktarmak istiyorsanız bir Service Worker'da bunu yapabilirsiniz. Yanıt gövdesi bir defadan fazla kullanılamadığından bunu yapmak için iki kopyaya ihtiyacınız vardır. Akışı iptal etmek için sonuçta ortaya çıkan her iki dalı da iptal etmeniz gerekir. Bir akış için bağlantı oluşturduğunuzda, akış genellikle bu süre boyunca kilitlenir ve diğer okuyucular tarafından kilitlenmesi engellenir.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Okunabilir bayt akışları

Baytları temsil eden akışlarda, baytları verimli bir şekilde işlemek için özellikle kopyaları en aza indirerek okunabilir akışın genişletilmiş bir sürümü sağlanır. Bayt akışları, kendi arabelleğinizi getirme (BYOB) okuyucuların edinilmesini sağlar. Varsayılan uygulama, WebSockets'te dizeler veya dizi arabellekleri gibi farklı çıkışlar sağlayabilirken bayt akışları, bayt çıkışını garanti eder. Buna ek olarak, BYOB okuyucuların kararlılık avantajları vardır. Çünkü bir tamponun ayrılması durumunda tamponun aynı tampona iki kez yazılmaması sağlanır ve böylece yarış koşulları önlenir. BYOB okuyucuları, tarayıcının arabellekleri yeniden kullanabildiği için çöp toplama işlemi çalıştırma sayısını azaltabilir.

Okunabilir bir bayt akışı oluşturma

ReadableStream() oluşturucuya ek bir type parametresi ileterek okunabilir bir bayt akışı oluşturabilirsiniz.

new ReadableStream({ type: 'bytes' });

underlyingSource

Okunabilir bir bayt akışının temel kaynağına, değişiklik yapması için bir ReadableByteStreamController verilir. ReadableByteStreamController.enqueue() yöntemi, değeri ArrayBufferView olan bir chunk bağımsız değişkeni alır. ReadableByteStreamController.byobRequest özelliği, geçerli BYOB pull isteğini veya hiç yoksa null döndürür. Son olarak ReadableByteStreamController.desiredSize özelliği, kontrollü akışın dahili sırasını doldurmak için istenen boyutu döndürür.

queuingStrategy

Aynı şekilde isteğe bağlı olarak ReadableStream() kurucusunun ikinci bağımsız değişkeni queuingStrategy bağımsız değişkenidir. Bu, akış için isteğe bağlı olarak bir sıraya koyma stratejisi tanımlayan ve bir parametre alan nesnedir:

  • highWaterMark: Bu sıralama stratejisini kullanan akışın yüksek su işaretini belirten negatif olmayan bir bayt sayısı. Bu değer, karşı baskıyı belirlemek için kullanılır ve kendini uygun ReadableByteStreamController.desiredSize özelliği aracılığıyla gösterir. Ayrıca, temel kaynağın pull() yönteminin ne zaman çağrıldığını da yönetir.

getReader() ve read() yöntemleri

Daha sonra, mode parametresini uygun şekilde ayarlayarak ReadableStreamBYOBReader için erişim elde edebilirsiniz: ReadableStream.getReader({ mode: "byob" }). Bu, kopyaları önlemek için tampon ayırma üzerinde daha hassas bir kontrol sağlar. Bayt akışından okumak için ReadableStreamBYOBReader.read(view) işlevini çağırmanız gerekir. Burada view, ArrayBufferView değeridir.

Okunabilir bayt akışı kod örneği

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Aşağıdaki işlev, rastgele oluşturulmuş bir dizinin sıfır kopyayla verimli bir şekilde okunmasını sağlayan okunabilir bayt akışları döndürür. Önceden belirlenmiş 1.024 yığın boyutunu kullanmak yerine, tam kontrol sağlamak için geliştirici tarafından sağlanan arabelleği doldurmaya çalışır.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Yazılabilir akışın mekaniği

Yazılabilir akış, JavaScript'te WritableStream nesnesiyle gösterilen veri yazabileceğiniz bir hedeftir. Bu, alttaki havuzun (ham verilerin yazıldığı daha düşük seviyeli bir G/Ç havuzu) üzerinde bir soyutlama işlevi görür.

Veriler, her defasında bir parça olmak üzere yazar aracılığıyla akışa yazılır. Bölümler, tıpkı bir okuyucudaki parçalar gibi çok çeşitli biçimlerde olabilir. Yazmaya hazır parçaları üretmek için istediğiniz kodu kullanabilirsiniz. Yazar ve ilişkili kod yapımcı olarak adlandırılır.

Bir yazar oluşturulduğunda ve bir akışa (etkin yazar) yazmaya başladığında onun bu içeriğe bağlı olduğu söylenir. Yazılabilir bir akışa aynı anda yalnızca bir yazar yazabilir. Yayınınıza başka bir yazarın yazmaya başlamasını istiyorsanız genellikle başka bir yazar eklemeden önce onu yayınlamanız gerekir.

Dahili sıra, akışa yazılan ancak henüz temel havuz tarafından işlenmemiş olan parçaları takip eder.

Sıralama stratejisi, bir akışın dahili sırasının durumuna göre geri basınç sinyalini nasıl vermesi gerektiğini belirleyen nesnedir. Sıraya ekleme stratejisi, her parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu, yüksek su işareti olarak bilinen belirli bir sayıyla karşılaştırır.

Nihai yapıya denetleyici adı verilir. Her yazılabilir akışın, akışı kontrol etmenize (örneğin, iptal etmek) olanak tanıyan ilişkili bir denetleyicisi vardır.

Yazılabilir bir akış oluşturma

Streams API'nin WritableStream arayüzü, akış verilerini havuz olarak bilinen bir hedefe yazmak için standart bir soyutlama sağlar. Bu nesne, yerleşik karşı basınç ve sıralama içerir. Oluşturucuyu WritableStream() çağırarak yazılabilir bir akış oluşturursunuz. İsteğe bağlı underlyingSink parametresi bulunur. Bu parametre, oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntemler ve özelliklere sahip bir nesneyi temsil eder.

underlyingSink

underlyingSink, geliştirici tarafından tanımlanan aşağıdaki isteğe bağlı yöntemleri içerebilir. Yöntemlerden bazılarına iletilen controller parametresi bir WritableStreamDefaultController değeridir.

  • start(controller): Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Bu yöntemin içeriği, altta yatan havuza erişim sağlamayı amaçlamalıdır. Bu süreç eşzamansız olarak yapılacaksa başarı veya başarısızlık sinyali sağlayabilir.
  • write(chunk, controller): Bu yöntem, yeni bir veri parçası (chunk parametresinde belirtilir) temel havuza yazılmaya hazır olduğunda çağrılır. Yazma işleminin başarılı veya başarısız olduğunu işaret eden bir söz verebilir. Bu yöntem, yalnızca önceki yazmalar başarılı olduktan sonra çağrılır ve akış kapatıldıktan veya iptal edildikten sonra hiçbir zaman çağrılmaz.
  • close(controller): Bu yöntem, uygulama, akışa parça yazmayı tamamladığını bildirirse çağrılır. İçerikler, alttaki havuza yazma işlemlerini sonlandırmak ve buna erişimi serbest bırakmak için gereken işlemi yapmalıdır. Bu süreç eşzamansızsa başarı veya başarısızlık sinyalini verebilir. Bu yöntem yalnızca sıraya alınan tüm yazma işlemleri başarılı olduktan sonra çağrılır.
  • abort(reason): Bu yöntem, uygulama, akışı aniden kapatmak ve hatalı bir duruma sokmak istediğini belirtirse çağrılır. close() gibi bekletilen kaynakları temizleyebilir ancak yazmalar sıraya alınmış olsa bile abort() çağrılır. Bu parçalar atlanacak. Bu süreç eşzamansızsa başarı veya başarısızlık sinyali sağlayabilir. reason parametresi, akışın neden iptal edildiğini açıklayan bir DOMString içerir.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API'nin WritableStreamDefaultController arayüzü, yazma için veya yazmanın sonunda daha fazla parça gönderildiğinden kurulum sırasında WritableStream durumunun kontrol edilmesini sağlayan bir denetleyiciyi temsil eder. WritableStream oluştururken altta yatan havuza değiştirebileceği karşılık gelen bir WritableStreamDefaultController örneği verilir. WritableStreamDefaultController öğesinin tek bir yöntemi vardır: WritableStreamDefaultController.error(). Bu yöntem, ilişkili akışla gelecekte yapılacak etkileşimlerde hataya neden olur. WritableStreamDefaultController, AbortSignal örneğini döndüren ve gerektiğinde WritableStream işleminin durdurulmasına izin veren signal özelliğini de destekler.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Aynı şekilde isteğe bağlı olarak WritableStream() kurucusunun ikinci bağımsız değişkeni queuingStrategy bağımsız değişkenidir. Bu, akış için isteğe bağlı olarak bir sıraya koyma stratejisi tanımlayan ve iki parametre alan bir nesnedir:

  • highWaterMark: Bu sıralama stratejisini kullanan akışın yüksek su işaretini belirten negatif olmayan bir sayı.
  • size(chunk): Belirli bir parçanın negatif olmayan sonlu boyutunu hesaplayıp döndüren bir işlev. Sonuç, uygun WritableStreamDefaultWriter.desiredSize özelliği üzerinden hissedilen, karşı baskıyı belirlemek için kullanılır.

getWriter() ve write() yöntemleri

Yazılabilir bir akışa yazmak için WritableStreamDefaultWriter türünde bir yazara ihtiyacınız vardır. WritableStream arayüzünün getWriter() yöntemi, yeni WritableStreamDefaultWriter örneğini döndürür ve akışı bu örneğe kilitler. Yayın kilitliyken, geçerli yayın yayınlanana kadar başka yazar edinilemez.

WritableStreamDefaultWriter arayüzünün write() yöntemi, geçirilen bir veri parçasını WritableStream'a ve onun altındaki havuza yazar, ardından yazma işleminin başarılı veya başarısız olduğunu belirten bir vaat döndürür. "Başarı"nın ne anlama geldiğinin temelde olduğunu unutmayın. Bu, parçanın kabul edildiğini ve nihai hedefine güvenli bir şekilde kaydedildiğini gösterebilir.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked özelliği

Yazılabilir bir akışın kilitli olup olmadığını kontrol etmek için ilgili akışın WritableStream.locked özelliğine erişebilirsiniz.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Yazılabilir akış kodu örneği

Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Okunabilir bir akışı yazılabilir bir akışa bağlama

Okunabilir bir akış, okunabilir akışın pipeTo() yöntemi aracılığıyla yazılabilir bir akışa aktarılabilir. ReadableStream.pipeTo(), mevcut ReadableStream değerini belirli bir WritableStream öğesine bağlar ve bağlantı işlemi başarıyla tamamlandığında yerine getirilecek veya hatalarla karşılaşılırsa reddedilecek bir söz döndürür.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Dönüşüm akışı oluşturma

Streams API'nin TransformStream arayüzü, dönüştürülebilir bir veri kümesini temsil eder. Dönüşüm akışı oluşturmak için kurucuyu TransformStream() çağırın, bu işlem belirtilen işleyicilerden bir dönüşüm akışı nesnesi oluşturur ve döndürür. TransformStream() oluşturucusu, ilk bağımsız değişkeni olarak transformer öğesini temsil eden isteğe bağlı bir JavaScript nesnesini kabul eder. Bu tür nesneler, aşağıdaki yöntemlerin herhangi birini içerebilir:

transformer

  • start(controller): Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Bu, genellikle controller.enqueue() kullanılarak ön ek parçalarını sıraya koymak için kullanılır. Bu parçalar, okunabilir taraftan okunur ancak yazılabilir tarafa herhangi bir yazma işlemi gerektirmez. Bu başlangıç işlemi eşzamansızsa (örneğin, ön ek parçalarını edinmek için biraz çaba sarf etmesi nedeniyle) işlev, başarı veya başarısızlık sinyalini verecek bir söz verebilir; reddedilen bir söz ise akışta hataya neden olur. Atılan istisnalar, TransformStream() oluşturucusu tarafından yeniden atanır.
  • transform(chunk, controller): Bu yöntem, başlangıçta yazılabilir tarafa yazılan yeni bir parça dönüştürülmeye hazır olduğunda çağrılır. Akış uygulaması, bu işlevin yalnızca önceki dönüşümler başarılı olduktan sonra çağrılacağını ve start() tamamlanmadan veya flush() çağrıldıktan sonra hiçbir zaman çağrılacağını garanti eder. Bu işlev, dönüşüm akışının asıl dönüştürme işlemini gerçekleştirir. controller.enqueue() işlevini kullanarak sonuçları sıraya alabilir. Bu, yazılabilir tarafa yazılan tek bir parçanın, controller.enqueue() işlevinin çağrılma sayısına bağlı olarak okunabilir tarafta sıfır veya birden fazla parçayla sonuçlanmasına izin verir. Dönüşüm süreci eşzamansızsa bu işlev, dönüşümün başarılı veya başarısız olduğunun sinyalini verecek bir vaat getirebilir. Reddedilen taahhüt, dönüşüm akışının hem okunabilir hem de yazılabilir taraflarında hataya neden olur. transform() yöntemi sağlanmazsa kimlik dönüşümü kullanılır. Bu işlem, parçaları yazılabilir taraftan okunabilir tarafa doğru sıralar.
  • flush(controller): Bu yöntem, yazılabilir tarafa yazılan tüm parçalar transform() içinden başarılı bir şekilde geçirilerek dönüştürüldükten ve yazılabilir taraf kapatılmak üzereyken çağrılır. Tipik olarak bu, sonek parçaları kapanmadan önce okunabilecek tarafı kuyruğa almak için kullanılır. Temizleme işlemi eşzamansızsa işlev, başarılı veya başarısız olduğunu işaret etmek için bir vaat döndürebilir. Sonuç, stream.writable.write() çağrısını yapana bildirilir. Buna ek olarak, reddedilen bir taahhüt, akışın hem okunabilir hem de yazılabilir taraflarında hataya neden olur. İstisna vermek, reddedilen bir sözü döndürmekle aynı şekilde değerlendirilir.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategy ve readableStrategy sıralama stratejileri

TransformStream() oluşturucunun ikinci ve üçüncü isteğe bağlı parametreleri, isteğe bağlı writableStrategy ve readableStrategy sıralama stratejileridir. Bunlar, sırasıyla okunabilir ve yazılabilir akış bölümlerinde özetlendiği şekilde tanımlanır.

Dönüşüm akışı kod örneği

Aşağıdaki kod örneğinde basit bir dönüşüm akışını çalışırken görebilirsiniz.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Bir dönüşüm akışı üzerinden okunabilir bir akışı bağlama

ReadableStream arayüzünün pipeThrough() yöntemi, mevcut akışı bir dönüşüm akışı veya başka bir yazılabilir/okunabilir çift aracılığıyla bağlamak için zincirlenebilir bir yol sağlar. Bir akışın ardışık düzeni, genellikle ardışık düzen süresince kilitleyerek diğer okuyucuların onu kilitlemesini önler.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Sıradaki kod örneğinde (biraz düşünülmüş), döndürülen yanıt sözünü akış olarak kullanıp yığına göre büyük harf kullanarak tüm metni büyük harfle yapan bir fetch() "bağırma" sürümünü nasıl uygulayabileceğiniz gösterilmektedir. Bu yaklaşımın avantajı, tüm belgenin indirilmesini beklemenizin gerekmemesidir. Bu da büyük dosyalarla çalışırken büyük bir fark yaratabilir.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demografi

Aşağıdaki demoda okunabilir, yazılabilir ve dönüştürülebilir akışlar gösterilmektedir. Ayrıca, pipeThrough() ve pipeTo() boru zinciri örneklerini ve tee() değerlerini de göstermektedir. İsteğe bağlı olarak demoyu kendi penceresinde çalıştırabilir veya kaynak kodu görüntüleyebilirsiniz.

Tarayıcıda faydalı yayınlar var

Tarayıcıda yerleşik olarak bulunan birçok faydalı akış vardır. Bir blob'tan kolayca ReadableStream oluşturabilirsiniz. Blob arayüzünün stream() yöntemi, okuma sonrasında blob içinde bulunan verileri döndüren bir ReadableStream döndürür. Ayrıca File nesnesinin belirli bir Blob türü olduğunu ve bir blob'un yapabileceği her bağlamda kullanılabileceğini unutmayın.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode() ve TextEncoder.encode() akış varyantları sırasıyla TextDecoderStream ve TextEncoderStream olarak adlandırılır.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

CompressionStream ve DecompressionStream dönüşüm akışlarıyla dosyaları sırayla sıkıştırmak veya açmak artık çok kolay. Aşağıdaki kod örneğinde, Akışlar spesifikasyonunu nasıl indirebileceğiniz, tarayıcı içinde nasıl sıkıştırabileceğiniz (gzip) ve sıkıştırılmış dosyayı doğrudan diske nasıl yazabileceğiniz gösterilmektedir.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API'nin FileSystemWritableFileStream ve deneysel fetch() istek akışları, vahşi ortamdaki yazılabilir akışlara örnektir.

Serial API, hem okunabilir hem de yazılabilir akışları yoğun şekilde kullanır.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Son olarak WebSocketStream API, akışları WebSocket API ile entegre eder.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Faydalı kaynaklar

Teşekkür

Bu makale Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley ve Adam Rice tarafından incelenmiştir. Jake Archibald'ın blog yayınları akışları anlamama çok yardımcı oldu. Kod örneklerinden bazıları, GitHub kullanıcısının @bellbind adlı kullanıcının keşiflerinden esinlenilerek yazı tipinin ağırlıklı olarak Akışlar üzerindeki MDN Web Dokümanları'na dayanan parçalarıdır. Streams Standard'ın yazarları bu spesifikasyonu yazma konusunda büyük bir emek harcadı. Ryan Lara'nın Unsplash'teki hero resmi.