Stream API 让你能够用代码方式处理通过网络获取或者在本地以各种方式产生的数据流,并且用 JavaScript 进行处理。这种流处理就是把你想接收、传送或更改的资源分解成小片段,然后逐个处理这些片段。虽然浏览器在接收用于展示在网页上的资料,比如HTML或视频时,本来就会进行流处理,但在 2015 年引入带有流处理功能的 fetch
之前,JavaScript 并没有这个能力。
NOTE
虽然 XMLHttpRequest
技术上允许流媒体传输,但是实现起来并不优雅。
在过去,无论你要处理的是视频、文本文件还是其他类型的资源,你都需要先下载整个文件,等待它被反序列化成一个合适的格式,然后再进行进一步的处理。但是,随着流功能在 JavaScript 中的应用,这个处理方式有了根本性的改变。现在,一旦客户端获取到原始数据,你就可以立即使用 JavaScript 进行逐步处理,无需额外生成缓冲、字符串或 blob。这一新的处理方式开启了一系列新的应用场景,例如:
- 视频特效: 实时通过转换流对可读的视频流进行特效处理。
- 数据 (解)压缩: 通过转换流对文件流进行选择性 (解)压缩处理。
- 图像解码: 通过转换流将 HTTP 响应流转为位图数据,然后通过另一个转换流将位图翻译成 PNG 格式。如果这个过程被放到 service worker 的
fetch
处理函数中,则可以支持像 AVIF 这样的新图像格式。
浏览器支持
推荐在 caniuse 查询 ReadableStream
、WritableStream
、TransformStream
的可用情况。
NOTE
符合要求的浏览器想要使用流可以借助兼容库(Polyfill)。建议只有在内置功能不可用的时候才按需加载这个兼容库。
核心概念
在深入分析各种类型的流之前,我们先来了解一些核心概念。
数据块
在流的世界里,我们用数据块这个词来表示写入或从流中读取的单个数据单元。数据块可以是任何类型的,甚至一个流可能包含多种类型的数据块。通常,数据块并非流中的最小数据单位。例如,一个字节流中的数据块可能是由包含 16KiB Uint8Array
单元,而非单一字节。
可读流
可读流是一个你能从中读取数据的数据源。也就是说,数据从一个可读流中产出。具体说来,可读流就是 ReadableStream
类的一个实例。
可写流
可写流则是一个你能写入数据的目的地。也就是说,数据会进入一个可写流中。具体说来,可写流就是 WritableStream
类的一个实例。
转换流
转换流由一对流组成:一个可写流(我们叫它可写端),和一个可读流(我们叫它可读端)。你可以把它想象成一个同声传译——他们能够实时地把一种语言翻译成另一种语言。对于转换流,向其可写端写入数据会导致新的数据在可读端可供读取。具体来讲,任何具有 writable
和 readable
属性的对象都能作为一个转换流。但是,标准的 TransformStream
类能让创建此类一对互相关联的流更为便捷。
管道链
我们主要是通过管道方式来使用流的。一个可读流可以直接与一个可写流之间建立管道,通过调用可读流的 pipeTo()
方法,或者可以通过一个或多个转换流建立管道,使用的是可读流的 pipeThrough()
方法。被管道连接的流集合就被称为管道链。
背压
当管道链建立后,流就会开始传递关于数据块应该以怎样的速度流动的信号。链中的任何环节如果暂时无法接收数据块,就会向上游传递一个信号,直至源头,告诉源头减慢产出速度。这一过程,我们称之为“背压”,旨在调控流速。
分流
可读流可以使用 tee()
方法进行“分流”(这个词得名于大写字母 T 的形状)。分流会锁定原始流,使之无法直接使用,但同时会创建两个新的流,称为分支,它们可以分别被消耗。分流功能非常重要,因为流一旦被消耗就不能倒回或重启,我们稍后会更详细地讨论这个话题。
可读流的原理
可读流是一个数据源,它在 JavaScript 中被 ReadableStream
对象所代表,数据源从 underlying source(底层数据源)以流的方式传出。通过 ReadableStream()
构造函数,我们可以从给定的处理器中创建并返回一个可读流对象。底层数据源有两种类型:
- 推送型源头 会在你访问它们时主动向你推送数据,你需要自行决定开始、暂停或取消接收流。典型的例子包括实时视频流、服务器发送的事件或 WebSocket。
- 拉取型源头 则需要你在连接后,主动向它们请求数据。例如通过
fetch()
或XMLHttpRequest
做的 HTTP 请求就属于这种类型。
流中的数据是以小单位称为块顺序读取的。被放入流中的块被称为入队,也就是说它们正在队列中等待被读取。一个内部队列负责跟踪还未被读取的块。
队列策略是决定流如何根据其内部队列状态发出背压信号的对象。队列策略给每个块指定一个大小,并将队列中所有块的总大小与一个被称为高水位线的特定数值进行比较。
流中的块通过读取器来读取。读取器一次读出一个块,你可以针对每个块进行你想做的操作。读取器,加上它配套的处理代码,统称为消费者。
接下来我们会谈到的结构是控制器。每个可读流都有一个相关的控制器,顾名思义,它让你可以控制流。
每次只能有一个读取器来读取流;当读取器被创建并开始读取流时,它成为了活跃读取器,被锁定在那个流上。如果你想让另一个读取器接管流,你通常需要先释放第一个读取器(不过你可以使用分流)。
创建可读流
通过调用构造函数 ReadableStream()
来创建一个可读流。该构造函数有一个可选参数 underlyingSource
,它是一个包含方法和属性的对象,用来定义构建的流实例将如何运作。
underlyingSource
底层数据源(underlying source)有以下几个可选方法:
start(controller)
:该方法在对象被构造时立即被调用。它可以访问流的源,并进行所有必要的流功能设置。如果此过程是异步的,这个方法可以返回一个 promise ,表示操作是否成功。传递给此方法的controller
参数是一个ReadableStreamDefaultController
对象。pull(controller)
:当需要获取更多的块以控制流时,可以使用该方法。只要流内部的块队列未满,就会反复调用这个方法,直至队列达到其高水位线。如果调用pull()
返回的是一个 promise,那么在该 promise 解决之前,不会再次调用pull()
。如果 promise 被拒绝,流会进入错误状态。cancel(reason)
:当流的消费者取消流时调用此方法。
const readableStream = new ReadableStream({ start(controller) { /* … */ },
pull(controller) { /* … */ },
cancel(reason) { /* … */ },});
ReadableStreamDefaultController
提供了以下方法:
ReadableStreamDefaultController.close()
这个方法用来关闭关联的流。ReadableStreamDefaultController.enqueue()
在关联的流中入队一个指定的数据块。ReadableStreamDefaultController.error()
该方法会导致与关联流的任何未来交互都会触发错误。
/* … */start(controller) { controller.enqueue('The first chunk!');},/* … */
queuingStrategy
ReadableStream()
构造函数的第二个参数(同样也是可选的)是 queuingStrategy
。这是一个对象,它可以为流定义队列策略,接受两个参数:
highWaterMark
:一个非负数,表示使用这个队列策略的流的高水位线。size(chunk)
:这是一个函数,它计算并返回给定数据块的有限非负大小。这个结果用来确定背压,这个背压通过ReadableStreamDefaultController.desiredSize
属性来传递。它还决定了底层数据源的pull()
方法何时被调用。
const readableStream = new ReadableStream( { /* … */ }, { highWaterMark: 10, size(chunk) { return chunk.length; }, },);
NOTE
你可以自己定义 queuingStrategy
,或者将 ByteLengthQueuingStrategy
或 CountQueuingStrategy
的一个实例作为此对象的值。如果没有提供 queuingStrategy
,那么默认值将会是 CountQueuingStrategy
且 highWaterMark
的值为 1
。
getReader()
和 read()
方法
要读取可读流,你需要一个读取器 ReadableStreamDefaultReader
。ReadableStream
接口的 getReader()
方法会创建一个读取器,并将流锁定到它。在流被锁定的时候,不能获取其他读取器,除非这个被释放。
ReadableStreamDefaultReader
接口的 read()
方法返回 promise,它可以让你访问流内部队列中的下一个数据块。它会根据流的状态返回结果或拒绝。不同的情况如下:
- 如果一个数据块可用,promise 会用 fulfilled 一个形式为
{ value: chunk, done: false }
的对象。 - 如果流被关闭,promise 会用 fulfilled 一个形式为
{ value: undefined, done: true }
的对象。 - 如果流有错误,promise 会被对应的错误 rejected。
const reader = readableStream.getReader();while (true) { const { done, value } = await reader.read(); if (done) { console.log("The stream is done."); break; } console.log("Just read a chunk:", value);}
locked
属性
你可以通过访问其 ReadableStream.locked
属性来检查一个可读流是否被锁定。
const locked = readableStream.locked;console.log(`The stream is ${locked ? "indeed" : "not"} locked.`);
可读流示例
下面的代码示例显示了所有的操作步骤。你首先创建一个 ReadableStream
,在它的 underlyingSource
参数(也就是 TimestampSource
类)中定义了一个 start()
方法。这个方法设置流的 controller
在十秒内每秒 enqueue()
一次时间截。最后指示 controller close()
流。你可以通过 getReader()
方法创建读取器,并且持续调用 read()
来消费这个流,直到流被完成(done
)。
class TimestampSource { #interval;
start(controller) { this.#interval = setInterval(() => { const string = new Date().toLocaleTimeString(); // Add the string to the stream. controller.enqueue(string); console.log(`Enqueued ${string}`); }, 1_000);
setTimeout(() => { clearInterval(this.#interval); // Close the stream after 10s. controller.close(); }, 10_000); }
cancel() { // This is called if the reader cancels. clearInterval(this.#interval); }}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) { let result = ""; const reader = stream.getReader(); while (true) { // The `read()` method returns a promise that // resolves when a value has been received. const { done, value } = await reader.read(); // Result objects contain two properties: // `done` - `true` if the stream has already given you all its data. // `value` - Some data. Always `undefined` when `done` is `true`. if (done) return result; result += value; console.log(`Read ${result.length} characters so far`); console.log(`Most recently read chunk: ${value}`); }}concatStringStream(stream).then((result) => console.log("Stream complete", result));
异步迭代
在每一次 read()
循环迭代时检查流是否已经完成(done
)可能不是最方便的API。幸运的是,我们很快就能有一种更好的方法来处理这个问题:异步迭代。
for await (const chunk of stream) { console.log(chunk);}
CAUTION
(ReadableStream
的)异步迭代还没有在任何浏览器中实现。
目前可以使用 polyfill 实现这个行为。
if (!ReadableStream.prototype[Symbol.asyncIterator]) { ReadableStream.prototype[Symbol.asyncIterator] = async function* () { const reader = this.getReader(); try { while (true) { const { done, value } = await reader.read(); if (done) { return; } yield value; } } finally { reader.releaseLock(); } };}
对可读流进行分流
tee()
是 ReadableStream
接口的一个方法,其功能是将正在操作的可读流分出两个分支,返回的结果是一个包含两个 ReadableStream
实例的数组。这样我们就可以让两个不同的读取器同时读取同一个流。举个具体的例子,在 service worker 中,如果我们想从服务器获取数据并实时地把数据传送给浏览器,同时还需要将这份数据流存入 service worker 的缓存中,我们就可以用到 tee()
方法。因为一份响应数据只能被读取一次,所以我们需要通过 tee()
方法来创建两份相同的数据流。如果要终止数据流,那就需要同时取消这两个分支的读取。一般来说,对数据流进行分流操作会将其锁定,阻止其他读取器对其进行操作。
const readableStream = new ReadableStream({ start(controller) { // Called by constructor. console.log("[start]"); controller.enqueue("a"); controller.enqueue("b"); controller.enqueue("c"); }, pull(controller) { // Called `read()` when the controller's queue is empty. console.log("[pull]"); controller.enqueue("d"); controller.close(); }, cancel(reason) { // Called when the stream is canceled. console.log("[cancel]", reason); },});
// Create two `ReadableStream`s.const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you// would not do it this way, but you certainly can.const readerA = streamA.getReader();console.log("[A]", await readerA.read()); //=> {value: "a", done: false}console.log("[A]", await readerA.read()); //=> {value: "b", done: false}console.log("[A]", await readerA.read()); //=> {value: "c", done: false}console.log("[A]", await readerA.read()); //=> {value: "d", done: false}console.log("[A]", await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way// to read data from the stream.const readerB = streamB.getReader();while (true) { const result = await readerB.read(); if (result.done) break; console.log("[B]", result);}
可读字节流
对于以字节形式表示的流,我们提供了一个可读流的扩展版本,它能以更有效的方式处理字节,特别是在尽可能减少复制的情况下。字节流允许我们获取自定义缓冲区(bring-your-own-buffer,缩写 BYOB)读取器。默认的实现方式可以提供各种不同类型的输出,比如在 WebSocket 的情况下,它可以输出字符串或者数组缓冲区,而字节流则保证输出的都是字节。另外,BYOB 读取器还具有更稳定的优点,当缓冲区发生脱离时,它能保证不会在同一个缓冲区内重复写入,有效避免了产生竞态条件。BYOB 读取器还能减少浏览器运行垃圾收集的次数,因为它能够复用已有的缓冲区。
创建可读字节流
你可以通过在 ReadableStream()
构造函数中添加一个额外的 type
参数,从而创建一个可读的字节流。
new ReadableStream({ type: "bytes" });
underlyingSource
可读字节流的底层数据源通过 ReadableByteStreamController
实现。其中 ReadableByteStreamController.enqueue()
方法会使用一个 chunk
参数,该参数的值为一个 ArrayBufferView
对象。当有 BYOB 请求时,ReadableByteStreamController.byobRequest
属性将返回该请求,否则返回 null
。ReadableByteStreamController.desiredSize
属性描述了控制流的内部队列需要填充的目标大小。
queuingStrategy
ReadableStream()
构造函数的第二个参数,可以选用的 queuingStrategy
,是一个对象,它可以定义流的排队策略,该策略有一个参数:
highWaterMark
:非负数值,表示使用该策略的流的高水位。通过ReadableByteStreamController.desiredSize
属性用来指示背压的大小。同时,它决定了何时调用源头的pull()
方法。
NOTE
与其他类型的流的排队策略不同,对于可读字节流,它的排队策略没有 size(chunk)
函数。每个数据块的大小都由其 byteLength
属性决定。
NOTE
如果没有提供 queuingStrategy
,那么将使用默认策略,其 highWaterMark
的值为 0
。
getReader()
和 read()
方法
你可以通过设置 mode
参数为 “byob” 来获取 ReadableStreamBYOBReader
实例:ReadableStream.getReader({ mode: "byob" })
。这将允许你更精细地控制缓冲区分配以避免复制。若要从字节流中进行读取,你需要调用 ReadableStreamBYOBReader.read(view)
,这里的 view
是 ArrayBufferView
对象。
可读字节流示例
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);const buffer = await readInto(startingAB);console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) { let offset = 0;
while (offset < buffer.byteLength) { const { value: view, done } = await reader.read( new Uint8Array(buffer, offset, buffer.byteLength - offset), ); buffer = view.buffer; if (done) { break; } offset += view.byteLength; }
return buffer;}
以下函数返回一个可读字节流,它可以高效地进行零拷贝读取一个随机生成的数组。它并不是使用预设的 1,024 大小的数据块,而是试图填充开发者提供的缓冲区,以实现完全的控制。
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() { return new ReadableStream({ type: "bytes",
pull(controller) { // Even when the consumer is using the default reader, // the auto-allocation feature allocates a buffer and // passes it to us via `byobRequest`. const view = controller.byobRequest.view; view = crypto.getRandomValues(view); controller.byobRequest.respond(view.byteLength); },
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, });}
可写流的原理
在 JavaScript 中,可写流被表示为 WritableStream
对象,它是一个用于接收并写入数据的目标。这是对 underlying sink(底层数据槽) 的一层抽象,底层数据槽是一个低级别的 I/O 接口,用于写入原始数据。
数据是通过写入器按块进行写入的。这些块可以有多种形式,就像读取器中的块一样。你可以使用任意你喜欢的代码来生成待写入的块;写入器以及其相关代码统称为生产者。
当一个写入器被创建并开始向某个流中写数据时(我们称这样的写入器为活跃写入器),跟该流锁定。在同一时间,只能有一个写入器可以向可写流中写入数据。如果你想让另一个写入器开始向你的流写入数据,你通常需要先解除当前写入器,然后再向流中关联另一个写入器。
内部队列用于追踪已被写入流但尚未被底层数据槽处理的数据块。
排队策略则是一个根据其内部队列状态来决定流如何反馈背压信号的对象。排队策略会为每个数据块分配一个大小,并将队列中所有数据块的累积大小与一个设定的数值(被称为高水位线)进行比较。
最后一个构造部分是控制器。每个可写流都有一个关联的控制器,你可以通过这个控制器来操作流(例如将其中止)。
创建可写流
Stream API 的 WritableStream
接口提供了一种标准化的方式,允许我们将流数据写入到底层数据槽。这个对象具有内置的背压机制和队列管理。我们可以通过调用它的构造函数 WritableStream()
来创建一个可写流。它接受一个可选的 underlyingSink
参数,这个参数代表一个包含各种属性和方法的对象,用于定义流实例的行为。
underlyingSink
底层数据槽(underlying sink)有以下几个可选方法。这些方法接收的 controller
参数是一个 WritableStreamDefaultController
对象。
start(controller)
: 构造对象时,会立即调用此方法。它的主要目的是获取底层数据槽的访问权限。如果此过程是异步的,这个方法返回 promise ,表示操作是否成功。write(chunk, controller)
: 当有新的数据块(由chunk
参数指定)准备写入底层数据槽时,会调用此方法。它返回 promise ,用于表示写入操作是否成功。值得注意的是,这个方法只有在上一次的写入操作成功后才会被调用,而且一旦流被关闭或终止,就不会再被调用。close(controller)
: 当应用程序发出结束写入信号时,会调用此方法。此方法的主要作用是完成对底层数据槽的所有写入,并释放对其的访问。如果此过程是异步的,此方法返回 promise ,表示操作是否成功。要注意,只有当所有待处理的写入操作都成功完成后,才会调用此方法。abort(reason)
: 当应用程序希望立即关闭流并将其标记为错误状态时,会调用此方法。就像close()
方法一样,它会清理所有已分配的资源。但是,即使还存在待处理的写入操作,也会调用abort()
方法,然后将待处理的数据块丢弃。如果此过程是异步的,此方法返回 promise 。并且,reason
参数包含一个描述中止原因的DOMString
。
const writableStream = new WritableStream({ start(controller) { /* … */ },
write(chunk, controller) { /* … */ },
close(controller) { /* … */ },
abort(reason) { /* … */ },});
Stream API 的 WritableStreamDefaultController
接口扮演着一种控制器的角色,可以在设置 WritableStream
的状态、写入更多数据块,或者结束写入的过程中控制 WritableStream
。当我们创建一个 WritableStream
时,对应的 WritableStreamDefaultController
实例将赋值给数据槽,以便于操作。 WritableStreamDefaultController
只有一个方法,那就是 WritableStreamDefaultController.error()
,此方法可以使与关联的流在将来的任何交互中都产生错误。另外,WritableStreamDefaultController
还有一个 signal
属性,返回的是 AbortSignal
的实例,以便在需要时能够停止 WritableStream
的操作。
/* … */write(chunk, controller) { try { // Try to do something dangerous with `chunk`. } catch (error) { controller.error(error.message); }},/* … */
queuingStrategy
WritableStream()
构造函数可选择接受第二个参数——queuingStrategy
。它是一个对象,用以设定流的队列策略,具有两个参数:
highWaterMark
:代表着采用此队列策略的流的高水位,该值必须为非负数。size(chunk)
:此函数负责计算并返回给定数据块的有限非负大小。返回的结果决定背压,可以通过WritableStreamDefaultWriter.desiredSize
查询。
NOTE
你可以自己定义 queuingStrategy
,或者将 ByteLengthQueuingStrategy
或 CountQueuingStrategy
的一个实例作为此对象的值。如果没有提供 queuingStrategy
,那么默认值将会是 CountQueuingStrategy
且 highWaterMark
的值为 1
。
getWriter()
和 write()
方法
如果你想要往可写流中写入数据,你将需要一个叫做 WritableStreamDefaultWriter
的写入器。通过调用 WritableStream
接口的 getWriter()
方法,会得到一个新的 WritableStreamDefaultWriter
实例,并且此流会被锁定到该实例,只有当当前的实例被释放后,才能获取其他写入器。
WritableStreamDefaultWriter
接口的 write()
方法会将一个数据块写入到 WritableStream
及其底层数据槽,然后返回一个 promise 对象以表示写入操作的成功或失败。值得注意的是,“成功”的定义依赖于底层数据槽;它可能表示数据块已被接手,而并不一定意味着数据已被安全地保存到终点。
const writer = writableStream.getWriter();const resultPromise = writer.write("The first chunk!");
locked
属性
你可以通过访问其 WritableStream.locked
属性来检查一个可写流是否被锁定。
const locked = writableStream.locked;console.log(`The stream is ${locked ? "indeed" : "not"} locked.`);
可写流示例
以下代码统合了所有步骤。
const writableStream = new WritableStream({ start(controller) { console.log("[start]"); }, async write(chunk, controller) { console.log("[write]", chunk); // Wait for next write. await new Promise((resolve) => setTimeout(() => { document.body.textContent += chunk; resolve(); }, 1_000), ); }, close(controller) { console.log("[close]"); }, abort(reason) { console.log("[abort]", reason); },});
const writer = writableStream.getWriter();const start = Date.now();for (const char of "abcdefghijklmnopqrstuvwxyz") { // Wait to add to the write queue. await writer.ready; console.log("[ready]", Date.now() - start, "ms"); // The Promise is resolved after the write finishes. writer.write(char);}await writer.close();
可读流管道连接到可写流
我们可以通过可读流的 pipeTo()
方法,将其与可写流进行管道连接。ReadableStream.pipeTo()
方法会将当前的 ReadableStream
连接到指定的 WritableStream
,并返回一个 promise。如果管道连接过程顺利完成,这个 promise 就会被解决;如果过程中遇到任何错误,这个 promise 就会被拒绝。
const readableStream = new ReadableStream({ start(controller) { // Called by constructor. console.log("[start readable]"); controller.enqueue("a"); controller.enqueue("b"); controller.enqueue("c"); }, pull(controller) { // Called when controller's queue is empty. console.log("[pull]"); controller.enqueue("d"); controller.close(); }, cancel(reason) { // Called when the stream is canceled. console.log("[cancel]", reason); },});
const writableStream = new WritableStream({ start(controller) { // Called by constructor console.log("[start writable]"); }, async write(chunk, controller) { // Called upon writer.write() console.log("[write]", chunk); // Wait for next write. await new Promise((resolve) => setTimeout(() => { document.body.textContent += chunk; resolve(); }, 1_000), ); }, close(controller) { console.log("[close]"); }, abort(reason) { console.log("[abort]", reason); },});
await readableStream.pipeTo(writableStream);console.log("[finished]");
创建变换流
TransformStream
是 Stream API 的一个接口,代表着一组可变换的数据。你可以通过调用它的构造函数 TransformStream()
来创建一个变换流,这个构造函数会根据给出的处理器创建并返回一个变换流对象。TransformStream()
构造函数的第一个参数可以是一个可选的 JavaScript 对象,这个对象就是 transformer
。 这个对象可以包含以下的方法:
transformer
start(controller)
:这个方法会在对象构造时立即被调用。它通常用于通过controller.enqueue()
来入队前缀块,这些块会在可读端被读取,但并不依赖于可写端的任何写入。如果这个初始化过程是异步的,比如说需要一些时间来获取前缀块,那么这个函数可以返回一个 promise 来表明初始化的成功或失败;一个被拒绝的 promise 会导致流错误。任何抛出的异常都会被TransformStream()
构造函数重新抛出。transform(chunk, controller)
:当一个新的块原始写入到可写端,并准备被转换时,这个方法会被调用。流的实现保证这个函数只有在之前的变换已经成功,并且start()
已经完成或者flush()
已经被调用之后才会被调用。这个函数负责执行变换流的实际变换工作。它可以通过controller.enqueue()
来入队结果。这意味着一个写入到可写端的块可以在可读端产生零个或多个块,这取决于controller.enqueue()
被调用了多少次。如果变换过程是异步的,这个函数可以返回一个 promise 来表示变换的成功或失败。一个被拒绝的 promise 会导致变换流的读写端都出错。如果没有提供transform()
方法,那么将会使用本体转换,这种转换会将块从可写端不改变地移到可读端。flush(controller)
:在所有写入到可写端的块都已经成功通过transform()
转换,并且可写端即将被关闭时,这个方法会被调用。通常来说,这会在可读端即将关闭前,将后缀块入队到可读端。如果刷新过程是异步的,那么这个函数可以返回一个 promise 来表示刷新的成功或失败;结果会被传给stream.writable.write()
的调用者。此外,一个被拒绝的 promise 将会导致读写端都出错。抛出的异常会被当作返回被拒绝的 promise 来处理。
const transformStream = new TransformStream({ start(controller) { /* … */ },
transform(chunk, controller) { /* … */ },
flush(controller) { /* … */ },});
writableStrategy
与 readableStrategy
队列策略
TransformStream()
构造器的第二和第三参数,即 writableStrategy
与 readableStrategy
队列策略,都是可选的。它们在前面可读可写流章节中分别介绍过。
转换流代码示例
下方代码展示了如何简单应用转换流。
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.// This example shows how you would have done it before.const textEncoderStream = new TransformStream({ transform(chunk, controller) { console.log("[transform]", chunk); controller.enqueue(new TextEncoder().encode(chunk)); }, flush(controller) { console.log("[flush]"); controller.terminate(); },});
(async () => { const readStream = textEncoderStream.readable; const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter(); for (const char of "abc") { writer.write(char); } writer.close();
const reader = readStream.getReader(); for (let result = await reader.read(); !result.done; result = await reader.read()) { console.log("[value]", result.value); }})();
可读流管道连接到转换流
ReadableStream
接口的 pipeThrough()
方法,提供了一种链式操作,可以通过转换流或其他可写/可读对,实现当前流的管道传输。一般来说,当一个流在进行管道传输时,会被锁定,以防止其他读取器同时对其进行锁定。
const transformStream = new TransformStream({ transform(chunk, controller) { console.log("[transform]", chunk); controller.enqueue(new TextEncoder().encode(chunk)); }, flush(controller) { console.log("[flush]"); controller.terminate(); },});
const readableStream = new ReadableStream({ start(controller) { // called by constructor console.log("[start]"); controller.enqueue("a"); controller.enqueue("b"); controller.enqueue("c"); }, pull(controller) { // called read when controller's queue is empty console.log("[pull]"); controller.enqueue("d"); controller.close(); // or controller.error(); }, cancel(reason) { // called when rs.cancel(reason) console.log("[cancel]", reason); },});
(async () => { const reader = readableStream.pipeThrough(transformStream).getReader(); for (let result = await reader.read(); !result.done; result = await reader.read()) { console.log("[value]", result.value); }})();
接下来的代码示例(虽然有点故意为之的感觉)展示了如何实现一个“大叫版”的 fetch()
,它通过将返回的响应 promise 作为一个流进行消费,并逐块将文本转换为大写。这种方式的优势在于,你无需等待整个文档下载完毕,这在处理大型文件时,可以带来显著的效率提升。
function upperCaseStream() { return new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk.toUpperCase()); }, });}
function appendToDOMStream(el) { return new WritableStream({ write(chunk) { el.append(chunk); }, });}
fetch("./lorem-ipsum.txt").then((response) => response.body .pipeThrough(new TextDecoderStream()) .pipeThrough(upperCaseStream()) .pipeTo(appendToDOMStream(document.body)),);
示例
这个示例展示了可读流、可写流和转换流的使用。同时,它也包含了 pipeThrough()
和 pipeTo()
管道链的示例,并展示了 tee()
的用法。你可以选择在新窗口中运行这个示例,或者查看其源代码。
浏览器中实用的流
浏览器内置了众多实用的流。你可以轻松地从一个 blob 对象创建一个 ReadableStream
。Blob
接口的 stream() 方法会返回一个 ReadableStream
,读取该流会返回 blob 对象中的数据。另外,需要注意的是,File
对象是一种特殊的 Blob
,在任何需要使用 blob 的场景中,都可以使用 File
对象。
const readableStream = new Blob(["hello world"], { type: "text/plain" }).stream();
TextDecoder.decode()
和 TextEncoder.encode()
的流式版本被称为 TextDecoderStream
和 TextEncoderStream
。
const response = await fetch("https://streams.spec.whatwg.org/");const decodedStream = response.body.pipeThrough(new TextDecoderStream());
通过 CompressionStream
和 DecompressionStream
转换流,我们能方便的压缩或解压缩文件。下面这段代码示例演示了如何下载 Stream 规范,并在浏览器里进行 gzip 压缩,最后直接将压缩文件存入硬盘。
const response = await fetch("https://streams.spec.whatwg.org/");const readableStream = response.body;const compressedStream = readableStream.pipeThrough(new CompressionStream("gzip"));
const fileHandle = await showSaveFilePicker();const writableStream = await fileHandle.createWritable();compressedStream.pipeTo(writableStream);
文件系统访问 API 中的 FileSystemWritableFileStream
以及实验性的 fetch()
请求流,就是具体运用中的可写流的典型案例。
而 串行 API 就广泛地应用了可读流和可写流。
// Prompt user to select any serial port.const port = await navigator.serial.requestPort();// Wait for the serial port to open.await port.open({ baudRate: 9_600 });const reader = port.readable.getReader();
// Listen to data coming from the serial device.while (true) { const { value, done } = await reader.read(); if (done) { // Allow the serial port to be closed later. reader.releaseLock(); break; } // value is a Uint8Array. console.log(value);}
// Write to the serial port.const writer = port.writable.getWriter();const data = new Uint8Array([104, 101, 108, 108, 111]); // helloawait writer.write(data);// Allow the serial port to be closed later.writer.releaseLock();
最后,WebSocketStream
API 让你可以在 WebSocket API 的基础上实用流。
const wss = new WebSocketStream(WSS_URL);const { readable, writable } = await wss.connection;const reader = readable.getReader();const writer = writable.getWriter();
while (true) { const { value, done } = await reader.read(); if (done) { break; } const result = await process(value); await writer.write(result);}
实用资源
- Streams specification
- Accompanying demos
- Streams polyfill
- 2016—the year of web streams
- Async iterators and generators
- Stream Visualizer