Node.js 流基础
Node.js 本质上是异步和事件驱动的,因此非常擅长处理 I/O 密集型任务。如果你正在开发一个执行 I/O 操作的应用程序,可以利用 Node.js 中提供的流。让我们详细探索流,并了解它们如何简化 I/O。
关键要点
- Node.js 流是异步和事件驱动的,可以通过高效处理较小的、可管理的数据块来简化 I/O 操作。
- 流可以分为可读(Readable)、可写(Writable)、双工(Duplex,既可读又可写)或转换(Transform,在数据通过时修改数据)。
pipe()
函数是 Node.js 流中的一个有用工具,允许从源读取数据并写入目标,而无需手动管理数据流。- 现代 Node.js 提供了诸如
stream.pipeline()
和stream.finished()
等实用工具,以及基于 Promise 的 API,以实现更好的错误处理和流控制。 - 流可以与 async/await 模式一起使用,以实现更清晰、更可维护的代码。
什么是流
Node.js 中的流受到 Unix 管道的启发,并提供了一种机制,以流式方式从源读取数据并将其管道传输到目标。简单来说,流不过是一个 EventEmitter,并实现了一些特殊方法。根据实现的方法,流变为可读、可写、双工或转换。可读流允许你从源读取数据,而可写流允许你将数据写入目标。
如果你已经使用过 Node.js,你可能遇到过流。例如,在基于 Node.js 的 HTTP 服务器中,请求是一个可读流,响应是一个可写流。你可能使用过 fs 模块,它允许你处理可读和可写的文件流。
让我们了解不同类型的流。在本文中,我们将主要关注可读和可写流,但也会简要介绍双工和转换流。
可读流
可读流允许你从源读取数据。源可以是任何东西。它可以是文件系统上的简单文件、内存中的缓冲区,甚至是另一个流。由于流是 EventEmitters,它们在不同点发出多个事件。我们将使用这些事件来处理流。
从流中读取
从流中读取数据的最佳方法是监听 data 事件并附加回调。当数据块可用时,可读流会发出 data 事件,并且你的回调会执行。查看以下代码片段:
|
|
函数调用 fs.createReadStream()
给你一个可读流。最初,流处于静态状态。一旦你监听 data 事件并附加回调,它就开始流动。之后,数据块被读取并传递给回调。流实现者决定 data 事件发出的频率。例如,HTTP 请求可能在读取几 KB 数据后发出 data 事件。当你从文件读取数据时,你可能决定在读取一行后发出 data 事件。
当没有更多数据可读(到达末尾)时,流会发出 end 事件。在上面的代码片段中,我们监听此事件以在到达末尾时得到通知。
使用现代 ECMAScript 功能,我们可以使用 async/await 重写此代码:
|
|
这里,我们使用了几个较新的 JavaScript 功能:
for await...of
循环允许我们迭代异步可迭代对象(如 Node.js 中的流)- 我们创建了一个
streamToString
辅助函数,该函数从流中收集所有块,并返回一个解析为完整字符串的 Promise - 我们将所有内容包装在 try/catch 块中以进行适当的错误处理
- 这种方法比基于事件的方法更线性且更易于阅读
现在,可读流可以在两种模式下运行:
- 流动模式 – 数据自动读取,并通过事件尽快提供
- 暂停模式 – 你必须显式调用
read()
以重复获取数据块,直到读取每个数据块
|
|
read()
函数从内部缓冲区读取一些数据并返回它。当没有可读内容时,它返回 null。因此,在 while 循环中,我们检查 null 并终止循环。请注意,当可以从流中读取数据块时,会发出 readable 事件。
设置编码
默认情况下,从流中读取的数据是 Buffer 对象。如果你正在读取字符串,这可能不适合你。因此,你可以通过调用 Readable.setEncoding()
在流上设置编码,如下所示。
|
|
在上面的代码片段中,我们将编码设置为 utf8。因此,数据被解释为 utf8 并作为字符串传递给回调。
管道传输
管道传输是一种很好的机制,你可以从中读取数据并写入目标,而无需自己管理流。查看以下代码片段:
|
|
上面的代码片段利用 pipe()
函数将 file1 的内容写入 file2。由于 pipe()
为你管理数据流,你不应担心数据流慢或快。这使得 pipe()
成为读取和写入数据的简洁工具。你还应注意,pipe()
返回目标流。因此,你可以轻松地利用它来将多个流链接在一起。让我们看看如何!
然而,pipe()
的一个限制是它不提供良好的错误处理。这就是现代 Node.js 提供更好实用工具的地方:
|
|
这里:
- 我们使用来自流模块的
pipeline
函数,该函数自动处理错误和资源清理 - 我们使用
promisify
将基于回调的管道转换为 Promise - 然后我们可以使用 async/await 实现更清晰的流程
- 所有错误都在单个 try/catch 块中正确捕获
- 如果管道中的任何流发出错误,pipeline 会自动销毁所有流并使用错误调用回调
链式调用
假设你有一个存档并想要解压缩它。有多种方法可以实现这一点。但最简单和最干净的方法是使用管道和链式调用。查看以下代码片段:
|
|
首先,我们从文件 input.txt.gz 创建一个简单的可读流。接下来,我们将此流管道传输到另一个流 zlib.createGunzip()
以解压缩内容。最后,由于流可以链接,我们添加一个可写流以将解压缩的内容写入文件。
使用 pipeline 的更健壮的方法:
|
|
这里我们使用带有多个流的 pipeline:
- 与
pipe()
不同,pipe()
不能正确转发错误,pipeline 处理链中任何流的错误 - 如果管道中的任何流失败(例如文件不存在或内容不是有效的 gzip),回调会收到错误
- 如果任何流出错,pipeline 会自动通过销毁所有流来清理资源
- 最后一个参数是一个回调,它告诉我们操作是成功还是失败
其他方法
我们讨论了可读流中的一些重要概念。以下是你需要了解的更多流方法:
Readable.pause()
– 此方法暂停流。如果流已经在流动,它将不再发出 data 事件。数据将保留在缓冲区中。如果你在静态(非流动)流上调用此方法,则没有效果,并且流保持暂停状态Readable.resume()
– 恢复暂停的流readable.unpipe()
– 这会从管道目标中删除目标流。如果传递了参数,它会停止可读流管道传输到特定目标流。否则,所有目标流都会被删除
可写流
可写流允许你将数据写入目标。与可读流一样,这些也是 EventEmitters,并在不同点发出各种事件。让我们看看可写流中可用的各种方法和事件。
写入流
要将数据写入可写流,你需要在流实例上调用 write()
。以下代码片段演示了此技术。
|
|
上面的代码很简单。它只是从输入流读取数据块,并使用 write()
写入目标。此函数返回一个布尔值,指示操作是否成功。
writableStream.write(chunk)
的返回值指示内部缓冲区是否准备好接收更多数据,这对于处理背压至关重要:
true
:数据已成功写入,你可以立即继续写入更多数据false
:内部缓冲区已满(达到highWaterMark
限制)。这并不意味着发生错误,但信号表明你应该暂停写入以防止缓冲区过载。你应该等待'drain'
事件,然后再恢复写入
处理背压的更好方法:
|
|
此示例处理背压,这是流中的一个关键概念:
- 当
write()
返回 false 时,意味着内部缓冲区已满,我们应该停止发送更多数据 - 我们暂停可读流以暂时停止接收数据
- 当可写流发出 ‘drain’ 时,意味着缓冲区已清空,我们可以恢复读取
- 我们还为两个流添加了适当的错误处理
- 当读取完成时,我们在可写流上调用
end()
以发出完成信号 - 当写入者跟不上读取者时,这种方法可以防止内存无限增长
数据结束
当你没有更多数据要写入时,你可以简单地调用 end()
来通知流你已完成写入。假设 res 是一个 HTTP 响应对象,你经常执行以下操作以将响应发送到浏览器:
|
|
当调用 end()
并且每个数据块都已刷新时,流会发出 finish 事件。请注意,在调用 end()
后,你无法再写入流。例如,以下将导致错误。
|
|
以下是与可写流相关的一些重要事件:
error
– 发出以指示在写入/管道传输期间发生错误pipe
– 当可读流被管道传输到可写流时,此事件由可写流发出unpipe
– 当你在可读流上调用 unpipe 并停止其管道传输到目标流时发出
双工和转换流
双工流是可读和可写流的组合。它们维护两个独立的内部缓冲区,一个用于读取,一个用于写入,它们彼此独立运行。
双工流在你需要同时但独立的输入和输出流时非常有用,例如在网络套接字(如 TCP)中。
|
|
此示例创建了一个自定义双工流:
read()
方法生成从 A 到 Z 的大写字母(ASCII 代码 65-90)- 每次调用
read()
时,它都会推送下一个字母并递增计数器 - 当我们到达 ‘Z’ 时,我们推送 null 以发出读取流结束的信号
write()
方法只是将写入流的任何数据记录到控制台- 当你需要在单个流中进行独立的读取和写入操作时,双工流非常有用
转换流是一种特殊类型的双工流,可以在数据写入和读取时修改或转换数据。与双工流不同,双工流的输入和输出是分开的,转换流的输出与输入直接相关。典型示例包括用于压缩/解压缩的 zlib 流和用于加密/解密的加密流。
|
|
此转换流示例:
- 创建一个将输入文本转换为大写的转换流
transform()
方法获取输入块,转换它们,并将它们推送到输出- 我们从标准输入管道传输,通过我们的转换器,到标准输出
- 当你运行此代码时,你键入的任何内容都将以大写形式显示
- 转换流非常适合在数据流经时处理或修改数据,例如解析 JSON、转换编码或加密数据
结论
这就是流的基础知识。流、管道和链式调用是 Node.js 中核心且最强大的功能。如果负责任地使用,流确实可以帮助你编写简洁且高性能的代码来执行 I/O。只需确保处理流错误并适当关闭流以防止内存泄漏。
随着 Node.js API 的新增功能,如 stream.pipeline()
、stream.finished()
和基于 Promise 的流 API,处理流变得更加健壮和易于使用。当处理大量数据时,流应该是你实现高效内存使用和性能的首选解决方案。
什么是 Node.js 流?
什么是 Node.js 流? Node.js 流是 Node.js 标准库的一项功能,允许你以更高效和可扩展的方式处理数据,通过将其处理为更小、更易管理的块,而不是将整个数据集加载到内存中。
Node.js 流的主要类型有哪些? Node.js 流有四种主要类型:可读、可写、双工和转换。可读流用于读取数据,可写流用于写入数据,双工流允许读取和写入,转换流在数据通过时修改数据。
如何在 Node.js 中创建可读流? 要创建可读流,你可以使用 Node.js 提供的 stream.Readable
类。你可以扩展此类并实现 _read
方法以提供要读取的数据。
可读流的常见用例有哪些? 可读流对于读取大文件、处理来自外部源(如 HTTP 请求)的数据以及实时处理数据(如日志文件监控)非常有用。
如何在 Node.js 中创建可写流? 要创建可写流,你可以使用 Node.js 提供的 stream.Writable
类