Node.js 流基础:高效处理 I/O 的核心技术

本文深入解析 Node.js 中的流(Streams)机制,涵盖可读流、可写流、双工流和转换流的使用方法,以及如何通过 pipe() 和 pipeline() 实现数据高效传输和错误处理,适合需要处理大量 I/O 操作的开发者。

Node.js 流基础

Node.js 本质上是异步和事件驱动的,因此非常擅长处理 I/O 密集型任务。如果你正在开发一个执行 I/O 操作的应用程序,可以利用 Node.js 中提供的流。让我们详细探索流,并了解它们如何简化 I/O。

关键要点

  • Node.js 流是异步和事件驱动的,可以通过高效处理较小的、可管理的数据块来简化 I/O 操作。
  • 流可以分为可读(Readable)、可写(Writable)、双工(Duplex,既可读又可写)或转换(Transform,在数据通过时修改数据)。
  • pipe() 函数是 Node.js 流中的一个有用工具,允许从源读取数据并写入目标,而无需手动管理数据流。
  • 现代 Node.js 提供了诸如 stream.pipeline()stream.finished() 等实用工具,以及基于 Promise 的 API,以实现更好的错误处理和流控制。
  • 流可以与 async/await 模式一起使用,以实现更清晰、更可维护的代码。

什么是流

Node.js 中的流受到 Unix 管道的启发,并提供了一种机制,以流式方式从源读取数据并将其管道传输到目标。简单来说,流不过是一个 EventEmitter,并实现了一些特殊方法。根据实现的方法,流变为可读、可写、双工或转换。可读流允许你从源读取数据,而可写流允许你将数据写入目标。

如果你已经使用过 Node.js,你可能遇到过流。例如,在基于 Node.js 的 HTTP 服务器中,请求是一个可读流,响应是一个可写流。你可能使用过 fs 模块,它允许你处理可读和可写的文件流。

让我们了解不同类型的流。在本文中,我们将主要关注可读和可写流,但也会简要介绍双工和转换流。

可读流

可读流允许你从源读取数据。源可以是任何东西。它可以是文件系统上的简单文件、内存中的缓冲区,甚至是另一个流。由于流是 EventEmitters,它们在不同点发出多个事件。我们将使用这些事件来处理流。

从流中读取

从流中读取数据的最佳方法是监听 data 事件并附加回调。当数据块可用时,可读流会发出 data 事件,并且你的回调会执行。查看以下代码片段:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 传统回调方法
const fs = require('fs');
const readableStream = fs.createReadStream('file.txt');
let data = '';

readableStream.on('data', function(chunk) {
  data += chunk;
});

readableStream.on('end', function() {
  console.log(data);
});

// 错误处理
readableStream.on('error', (err) => {
  console.error('读取流时出错:', err);
});

函数调用 fs.createReadStream() 给你一个可读流。最初,流处于静态状态。一旦你监听 data 事件并附加回调,它就开始流动。之后,数据块被读取并传递给回调。流实现者决定 data 事件发出的频率。例如,HTTP 请求可能在读取几 KB 数据后发出 data 事件。当你从文件读取数据时,你可能决定在读取一行后发出 data 事件。

当没有更多数据可读(到达末尾)时,流会发出 end 事件。在上面的代码片段中,我们监听此事件以在到达末尾时得到通知。

使用现代 ECMAScript 功能,我们可以使用 async/await 重写此代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const fs = require('fs');
const { Readable } = require('stream');
const { promisify } = require('util');

// 将 stream.on('end') 转换为 Promise
const streamToString = async (stream) => {
  const chunks = [];
  
  for await (const chunk of stream) {
    chunks.push(typeof chunk === 'string' ? chunk : chunk.toString());
  }
  
  return chunks.join('');
};

async function readFile() {
  try {
    const readableStream = fs.createReadStream('file.txt');
    const content = await streamToString(readableStream);
    console.log(content);
  } catch (err) {
    console.error('读取文件时出错:', err);
  }
}

readFile();

这里,我们使用了几个较新的 JavaScript 功能:

  • for await...of 循环允许我们迭代异步可迭代对象(如 Node.js 中的流)
  • 我们创建了一个 streamToString 辅助函数,该函数从流中收集所有块,并返回一个解析为完整字符串的 Promise
  • 我们将所有内容包装在 try/catch 块中以进行适当的错误处理
  • 这种方法比基于事件的方法更线性且更易于阅读

现在,可读流可以在两种模式下运行:

  1. 流动模式 – 数据自动读取,并通过事件尽快提供
  2. 暂停模式 – 你必须显式调用 read() 以重复获取数据块,直到读取每个数据块
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const fs = require('fs');
const readableStream = fs.createReadStream('file.txt');
let data = '';
let chunk;

readableStream.on('readable', function() {
  while ((chunk = readableStream.read()) != null) {
    data += chunk;
  }
});

readableStream.on('end', function() {
  console.log(data);
});

read() 函数从内部缓冲区读取一些数据并返回它。当没有可读内容时,它返回 null。因此,在 while 循环中,我们检查 null 并终止循环。请注意,当可以从流中读取数据块时,会发出 readable 事件。

设置编码

默认情况下,从流中读取的数据是 Buffer 对象。如果你正在读取字符串,这可能不适合你。因此,你可以通过调用 Readable.setEncoding() 在流上设置编码,如下所示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
const fs = require('fs');
const readableStream = fs.createReadStream('file.txt');
let data = '';

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  data += chunk;
});

readableStream.on('end', function() {
  console.log(data);
});

在上面的代码片段中,我们将编码设置为 utf8。因此,数据被解释为 utf8 并作为字符串传递给回调。

管道传输

管道传输是一种很好的机制,你可以从中读取数据并写入目标,而无需自己管理流。查看以下代码片段:

1
2
3
4
5
const fs = require('fs');
const readableStream = fs.createReadStream('file1.txt');
const writableStream = fs.createWriteStream('file2.txt');

readableStream.pipe(writableStream);

上面的代码片段利用 pipe() 函数将 file1 的内容写入 file2。由于 pipe() 为你管理数据流,你不应担心数据流慢或快。这使得 pipe() 成为读取和写入数据的简洁工具。你还应注意,pipe() 返回目标流。因此,你可以轻松地利用它来将多个流链接在一起。让我们看看如何!

然而,pipe() 的一个限制是它不提供良好的错误处理。这就是现代 Node.js 提供更好实用工具的地方:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
const fs = require('fs');
const { pipeline } = require('stream');
const { promisify } = require('util');

const pipelineAsync = promisify(pipeline);

async function copyFile() {
  try {
    const readableStream = fs.createReadStream('file1.txt');
    const writableStream = fs.createWriteStream('file2.txt');
    
    await pipelineAsync(readableStream, writableStream);
    console.log('文件复制成功');
  } catch (err) {
    console.error('管道失败:', err);
  }
}

copyFile();

这里:

  • 我们使用来自流模块的 pipeline 函数,该函数自动处理错误和资源清理
  • 我们使用 promisify 将基于回调的管道转换为 Promise
  • 然后我们可以使用 async/await 实现更清晰的流程
  • 所有错误都在单个 try/catch 块中正确捕获
  • 如果管道中的任何流发出错误,pipeline 会自动销毁所有流并使用错误调用回调

链式调用

假设你有一个存档并想要解压缩它。有多种方法可以实现这一点。但最简单和最干净的方法是使用管道和链式调用。查看以下代码片段:

1
2
3
4
5
6
const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('input.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('output.txt'));

首先,我们从文件 input.txt.gz 创建一个简单的可读流。接下来,我们将此流管道传输到另一个流 zlib.createGunzip() 以解压缩内容。最后,由于流可以链接,我们添加一个可写流以将解压缩的内容写入文件。

使用 pipeline 的更健壮的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream');

pipeline(
  fs.createReadStream('input.txt.gz'),
  zlib.createGunzip(),
  fs.createWriteStream('output.txt'),
  (err) => {
    if (err) {
      console.error('管道失败:', err);
    } else {
      console.log('管道成功');
    }
  }
);

这里我们使用带有多个流的 pipeline:

  • pipe() 不同,pipe() 不能正确转发错误,pipeline 处理链中任何流的错误
  • 如果管道中的任何流失败(例如文件不存在或内容不是有效的 gzip),回调会收到错误
  • 如果任何流出错,pipeline 会自动通过销毁所有流来清理资源
  • 最后一个参数是一个回调,它告诉我们操作是成功还是失败

其他方法

我们讨论了可读流中的一些重要概念。以下是你需要了解的更多流方法:

  • Readable.pause() – 此方法暂停流。如果流已经在流动,它将不再发出 data 事件。数据将保留在缓冲区中。如果你在静态(非流动)流上调用此方法,则没有效果,并且流保持暂停状态
  • Readable.resume() – 恢复暂停的流
  • readable.unpipe() – 这会从管道目标中删除目标流。如果传递了参数,它会停止可读流管道传输到特定目标流。否则,所有目标流都会被删除

可写流

可写流允许你将数据写入目标。与可读流一样,这些也是 EventEmitters,并在不同点发出各种事件。让我们看看可写流中可用的各种方法和事件。

写入流

要将数据写入可写流,你需要在流实例上调用 write()。以下代码片段演示了此技术。

1
2
3
4
5
6
7
8
9
const fs = require('fs');
const readableStream = fs.createReadStream('file1.txt');
const writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  writableStream.write(chunk);
});

上面的代码很简单。它只是从输入流读取数据块,并使用 write() 写入目标。此函数返回一个布尔值,指示操作是否成功。

writableStream.write(chunk) 的返回值指示内部缓冲区是否准备好接收更多数据,这对于处理背压至关重要:

  • true:数据已成功写入,你可以立即继续写入更多数据
  • false:内部缓冲区已满(达到 highWaterMark 限制)。这并不意味着发生错误,但信号表明你应该暂停写入以防止缓冲区过载。你应该等待 'drain' 事件,然后再恢复写入

处理背压的更好方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const fs = require('fs');
const readableStream = fs.createReadStream('file1.txt');
const writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  // 如果 write 返回 false,暂停读取直到 drain 事件
  const canContinue = writableStream.write(chunk);
  if (!canContinue) {
    readableStream.pause();
  }
});

writableStream.on('drain', function() {
  // 当 drain 事件发生时恢复读取
  readableStream.resume();
});

readableStream.on('end', function() {
  writableStream.end();
});

// 错误处理
readableStream.on('error', (err) => {
  console.error('读取错误:', err);
  writableStream.end();
});

writableStream.on('error', (err) => {
  console.error('写入错误:', err);
});

此示例处理背压,这是流中的一个关键概念:

  • write() 返回 false 时,意味着内部缓冲区已满,我们应该停止发送更多数据
  • 我们暂停可读流以暂时停止接收数据
  • 当可写流发出 ‘drain’ 时,意味着缓冲区已清空,我们可以恢复读取
  • 我们还为两个流添加了适当的错误处理
  • 当读取完成时,我们在可写流上调用 end() 以发出完成信号
  • 当写入者跟不上读取者时,这种方法可以防止内存无限增长

数据结束

当你没有更多数据要写入时,你可以简单地调用 end() 来通知流你已完成写入。假设 res 是一个 HTTP 响应对象,你经常执行以下操作以将响应发送到浏览器:

1
2
res.write('Some Data!!');
res.end('Ended.');

当调用 end() 并且每个数据块都已刷新时,流会发出 finish 事件。请注意,在调用 end() 后,你无法再写入流。例如,以下将导致错误。

1
2
3
res.write('Some Data!!');
res.end();
res.write('Trying to write again'); //错误!

以下是与可写流相关的一些重要事件:

  • error – 发出以指示在写入/管道传输期间发生错误
  • pipe – 当可读流被管道传输到可写流时,此事件由可写流发出
  • unpipe – 当你在可读流上调用 unpipe 并停止其管道传输到目标流时发出

双工和转换流

双工流是可读和可写流的组合。它们维护两个独立的内部缓冲区,一个用于读取,一个用于写入,它们彼此独立运行。

双工流在你需要同时但独立的输入和输出流时非常有用,例如在网络套接字(如 TCP)中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
const { Duplex } = require('stream');

// 创建自定义双工流
const myDuplex = new Duplex({
  read(size) {
    // 读取的实现
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  },
  write(chunk, encoding, callback) {
    // 写入的实现
    console.log(chunk.toString());
    callback();
  }
});

// 初始化起始字符代码
myDuplex.currentCharCode = 65; // ASCII 为 'A'

此示例创建了一个自定义双工流:

  • read() 方法生成从 A 到 Z 的大写字母(ASCII 代码 65-90)
  • 每次调用 read() 时,它都会推送下一个字母并递增计数器
  • 当我们到达 ‘Z’ 时,我们推送 null 以发出读取流结束的信号
  • write() 方法只是将写入流的任何数据记录到控制台
  • 当你需要在单个流中进行独立的读取和写入操作时,双工流非常有用

转换流是一种特殊类型的双工流,可以在数据写入和读取时修改或转换数据。与双工流不同,双工流的输入和输出是分开的,转换流的输出与输入直接相关。典型示例包括用于压缩/解压缩的 zlib 流和用于加密/解密的加密流。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
const { Transform } = require('stream');

// 创建将传入数据转换为大写的转换流
const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    // 将块转换为大写并将其推送到读取队列
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// 使用转换流
process.stdin
  .pipe(upperCaseTr)
  .pipe(process.stdout);

此转换流示例:

  • 创建一个将输入文本转换为大写的转换流
  • transform() 方法获取输入块,转换它们,并将它们推送到输出
  • 我们从标准输入管道传输,通过我们的转换器,到标准输出
  • 当你运行此代码时,你键入的任何内容都将以大写形式显示
  • 转换流非常适合在数据流经时处理或修改数据,例如解析 JSON、转换编码或加密数据

结论

这就是流的基础知识。流、管道和链式调用是 Node.js 中核心且最强大的功能。如果负责任地使用,流确实可以帮助你编写简洁且高性能的代码来执行 I/O。只需确保处理流错误并适当关闭流以防止内存泄漏。

随着 Node.js API 的新增功能,如 stream.pipeline()stream.finished() 和基于 Promise 的流 API,处理流变得更加健壮和易于使用。当处理大量数据时,流应该是你实现高效内存使用和性能的首选解决方案。

什么是 Node.js 流?

什么是 Node.js 流? Node.js 流是 Node.js 标准库的一项功能,允许你以更高效和可扩展的方式处理数据,通过将其处理为更小、更易管理的块,而不是将整个数据集加载到内存中。

Node.js 流的主要类型有哪些? Node.js 流有四种主要类型:可读、可写、双工和转换。可读流用于读取数据,可写流用于写入数据,双工流允许读取和写入,转换流在数据通过时修改数据。

如何在 Node.js 中创建可读流? 要创建可读流,你可以使用 Node.js 提供的 stream.Readable 类。你可以扩展此类并实现 _read 方法以提供要读取的数据。

可读流的常见用例有哪些? 可读流对于读取大文件、处理来自外部源(如 HTTP 请求)的数据以及实时处理数据(如日志文件监控)非常有用。

如何在 Node.js 中创建可写流? 要创建可写流,你可以使用 Node.js 提供的 stream.Writable

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计