TypeScript 与 Node.js 流:高效处理大数据的终极指南

本文深入探讨了如何在 TypeScript 中使用 Node.js 流处理数据,涵盖可读流、可写流、双工流和转换流四种类型,并通过实际代码示例展示文件操作、HTTP 响应流和实时数据处理等场景。

Node.js Streams with TypeScript

Node.js 以其高效处理 I/O 操作的能力而闻名,而这一能力的核心在于流的概念。流允许您逐块处理数据,而不是一次性将所有内容加载到内存中——非常适合处理大文件、网络请求或实时数据。当您将流与 TypeScript 的强类型结合时,您将获得一个强大的组合:性能与安全性并存。

在本指南中,我们将深入探讨 Node.js 流,探索它们的类型,并通过使用 TypeScript 的实际示例进行讲解。无论您是 Node.js 新手还是希望提升技能的 TypeScript 爱好者,本文都将为您提供所需的内容。

为什么流很重要?

想象一下:您需要处理一个 50GB 的日志文件。将其完全加载到内存中会耗尽服务器的资源,导致崩溃或性能缓慢。流通过让您处理流动的数据来解决这个问题,就像用吸管啜饮而不是猛灌一加仑的壶。

这种效率是流成为 Node.js 基石的原因,为从文件操作到 HTTP 服务器的所有功能提供动力。TypeScript 通过添加类型定义、在编译时捕获错误以及提高代码可读性来增强这一点。让我们深入了解基础知识,看看这种协同作用在实践中是如何工作的。

四种类型的流

Node.js 提供四种主要的流类型,每种都有特定的用途:

  • 可读流:您可以从中读取的数据源(例如,文件、HTTP 响应)。
  • 可写流:您可以写入的目标(例如,文件、HTTP 请求)。
  • 双工流:既可读又可写(例如,TCP 套接字)。
  • 转换流:一种特殊的双工流,在数据通过时修改数据(例如,压缩)。

TypeScript 通过允许我们为流经它们的数据定义接口来增强这一点。让我们通过示例来分解它们。

设置您的 TypeScript 环境

在深入代码之前,请确保您已安装 Node.js 和 TypeScript。

创建一个新项目:

1
2
3
4
5
mkdir node-streams-typescript
cd node-streams-typescript
npm init -y
npm install typescript @types/node --save-dev
npx tsc --init

更新您的 tsconfig.json 以包含:

1
2
3
4
5
6
7
8
9
{
  "compilerOptions": {
    "target": "ES2020",
    "module": "commonjs",
    "strict": true,
    "outDir": "./dist"
  },
  "include": ["src/**/*"]
}

创建一个 src 文件夹,然后开始编码!

示例 1:使用可读流读取文件

让我们逐块读取一个文本文件。首先,在项目的根目录中创建一个名为 data.txt 的文件,其中包含一些示例文本(例如,“Hello, streams!”)。

现在,在 src/readStream.ts 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import { createReadStream } from 'fs';
import { Readable } from 'stream';

const readStream: Readable = createReadStream('data.txt', { encoding: 'utf8' });

readStream
  .on('data', (chunk: string) => {
    console.log('Chunk received:', chunk);
  })
  .on('end', () => {
    console.log('Finished reading the file.');
  })
  .on('error', (err: Error) => {
    console.error('Error:', err.message);
  });

运行它:

1
npx tsc && node dist/readStream.js

在这里,TypeScript 确保块符合我们的 Chunk 接口,并且错误事件处理程序期望一个 Error 类型。此流以块(文件默认为 64KB)读取 data.txt 并记录它们。

示例 2:使用可写流写入数据

现在,让我们将数据写入一个新文件。在 src/writeStream.ts 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import { createWriteStream } from 'fs';
import { Writable } from 'stream';

const writeStream: Writable = createWriteStream('output.txt', { encoding: 'utf8' });

const data: string[] = ['Line 1\n', 'Line 2\n', 'Line 3\n'];

data.forEach((line: string) => {
  writeStream.write(line);
});

writeStream.end(() => {
  console.log('Finished writing to output.txt');
});

writeStream.on('error', (err: Error) => {
  console.error('Error:', err.message);
});

编译并运行:

1
npx tsc && node dist/writeStream.js

这将创建包含三行的 output.txt。TypeScript 确保行是字符串,并为流方法提供自动完成。

示例 3:使用转换流进行管道传输

管道是流发挥优势的地方,将可读流连接到可写流。让我们通过一个转换流来增加一个转折,将我们的文本转换为大写。

src/transformStream.ts 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import { createReadStream, createWriteStream } from 'fs';
import { Transform, TransformCallback } from 'stream';

// 自定义转换流以将文本转换为大写
class UppercaseTransform extends Transform {
  _transform(chunk: Buffer, encoding: string, callback: TransformCallback): void {
    const upperChunk = chunk.toString().toUpperCase();
    this.push(upperChunk);
    callback();
  }
}

const readStream = createReadStream('data.txt', { encoding: 'utf8' });
const writeStream = createWriteStream('output_upper.txt');
const transformStream = new UppercaseTransform();

readStream
  .pipe(transformStream)
  .pipe(writeStream)
  .on('finish', () => {
    console.log('Transform complete! Check output_upper.txt');
  })
  .on('error', (err: Error) => {
    console.error('Error:', err.message);
  });

运行它:

1
npx tsc && node dist/transformStream.js

这将读取 data.txt,将文本转换为大写,并将其写入 output_upper.txt

TypeScript 的 TransformCallback 类型确保我们的 _transform 方法正确实现。

示例 4:使用双工流压缩文件

让我们处理一个更高级的场景:使用 zlib 模块压缩文件,该模块提供了一个双工流。它随我们之前安装的 @types/node 包一起提供。

src/compressStream.ts 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
import { pipeline } from 'stream';

const source = createReadStream('data.txt');
const destination = createWriteStream('data.txt.gz');
const gzip = createGzip();

pipeline(source, gzip, destination, (err: Error | null) => {
  if (err) {
    console.error('Compression failed:', err.message);
    return;
  }
  console.log('File compressed successfully! Check data.txt.gz');
});

运行它:

1
npx tsc && node dist/compressStream.js

在这里,pipeline 确保适当的错误处理和清理。gzip 流将 data.txt 压缩为 data.txt.gz。TypeScript 的类型推断保持我们的代码简洁和安全。

示例 5:流式传输 HTTP 响应

流在网络操作中表现出色。让我们使用 axios 模拟从 HTTP 服务器流式传输数据。安装它:

1
npm install axios @types/axios

src/httpStream.ts 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import axios from 'axios';
import { createWriteStream } from 'fs';
import { Writable } from 'stream';

async function streamHttpResponse(url: string, outputFile: string): Promise<void> {
  const response = await axios({
    method: 'get',
    url,
    responseType: 'stream',
  });

  const writeStream: Writable = createWriteStream(outputFile);
  response.data.pipe(writeStream);

  return new Promise((resolve, reject) => {
    writeStream.on('finish', () => {
      console.log(`Downloaded to ${outputFile}`);
      resolve();
    });
    writeStream.on('error', (err: Error) => {
      console.error('Download failed:', err.message);
      reject(err);
    });
  });
}

streamHttpResponse('https://example.com', 'example.html').catch(console.error);

运行它:

1
npx tsc && node dist/httpStream.js

这将流式传输 HTTP 响应(例如,网页)到 example.html。TypeScript 确保 urloutputFile 参数是字符串,并且 Promise 类型增加了清晰度。

我们还可以使用 Node.js 的内置 Fetch API(自 Node v18 起可用)或像 node-fetch 这样的库,它们也支持流式响应,尽管流类型可能不同(Web 流与 Node.js 流)。

示例:

1
2
3
const response = await fetch('https://example.com');
const writeStream = createWriteStream(outputFile);
response.body.pipe(writeStream);

示例 6:使用自定义可读流进行实时数据处理

让我们创建一个自定义的可读流来模拟实时数据,例如传感器读数。在 src/customReadable.ts 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import { Readable } from 'stream';

class SensorStream extends Readable {
  private count: number = 0;
  private max: number = 10;

  constructor(options?: any) {
    super(options);
  }

  _read(): void {
    if (this.count < this.max) {
      const data = `Sensor reading ${this.count}: ${Math.random() * 100}\n`;
      this.push(data);
      this.count++;
    } else {
      this.push(null); // 发出流结束信号
    }
  }
}

const sensor = new SensorStream({ encoding: 'utf8' });

sensor
  .on('data', (chunk: string) => {
    console.log('Received:', chunk.trim());
  })
  .on('end', () => {
    console.log('Sensor stream complete.');
  })
  .on('error', (err: Error) => {
    console.error('Error:', err.message);
  });

运行它:

1
npx tsc && node dist/customReadable.js

这将生成 10 个随机“传感器读数”并流式传输它们。TypeScript 的类类型确保我们的实现与 Readable 接口对齐。

示例 7:链接多个转换流

让我们链接转换以分阶段处理文本:将其转换为大写,然后添加时间戳。在 src/chainTransform.ts 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import { createReadStream, createWriteStream } from 'fs';
import { Transform, TransformCallback } from 'stream';

class UppercaseTransform extends Transform {
  _transform(chunk: Buffer, encoding: string, callback: TransformCallback): void {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

class TimestampTransform extends Transform {
  _transform(chunk: Buffer, encoding: string, callback: TransformCallback): void {
    const timestamp = new Date().toISOString();
    this.push(`[${timestamp}] ${chunk.toString()}`);
    callback();
  }
}

const readStream = createReadStream('data.txt', { encoding: 'utf8' });
const writeStream = createWriteStream('output_chain.txt');
const upper = new UppercaseTransform();
const timestamp = new TimestampTransform();

readStream
  .pipe(upper)
  .pipe(timestamp)
  .pipe(writeStream)
  .on('finish', () => {
    console.log('Chained transform complete! Check output_chain.txt');
  })
  .on('error', (err: Error) => {
    console.error('Error:', err.message);
  });

运行它:

1
npx tsc && node dist/chainTransform.js

这将读取 data.txt,将数据转换为大写,添加时间戳,并将结果写入 output_chain.txt。链接转换展示了流的模块化。

TypeScript 中流的最佳实践

  • 类型化您的数据:为块定义接口以尽早捕获类型错误。
  • 处理错误:始终附加错误事件监听器以避免未处理的异常。
  • 明智地使用管道:管道减少了手动事件处理并提高了可读性。
  • 背压:对于大数据,监控 writeStream.writableHighWaterMark 以避免压倒目标。

实际用例:流式传输 API 响应

假设您正在构建一个流式传输大型数据集的 API。使用 express 和流:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import express from 'express';
import { Readable } from 'stream';

const app = express();

app.get('/stream-data', (req, res) => {
  const data = ['Item 1\n', 'Item 2\n', 'Item 3\n'];
  const stream = Readable.from(data);

  res.setHeader('Content-Type', 'text/plain');
  stream.pipe(res);
});

app.listen(3000, () => {
  console.log('Server running on port 3000');
});

安装依赖项(npm install express @types/express),然后运行它。访问 http://localhost:3000/stream-data 以在浏览器中查看数据流!

高级技巧:处理背压

当可写流无法跟上可读流时,会发生背压。Node.js 通过管道自动处理此问题,但您可以手动监控它:

1
2
3
4
5
6
7
8
const writeStream = createWriteStream('large_output.txt');

if (!writeStream.write('data')) {
  console.log('Backpressure detected! Pausing...');
  writeStream.once('drain', () => {
    console.log('Resuming...');
  });
}

这确保您的应用在重负载下保持响应。

使用背压的注意事项:当写入大量数据时,可读流可能产生数据的速度快于可写流消耗的速度。虽然 pipepipeline 自动处理此问题,但如果手动写入,请检查 write() 是否返回 false,并在写入更多数据之前等待 'drain' 事件。

此外,异步迭代器(for await...of)是使用可读流的现代替代方案,与使用 .on('data').on('end') 相比,通常可以简化代码。

示例:

1
2
3
4
5
6
async function processStream(readable: Readable) {
  for await (const chunk of readable) {
    console.log('Chunk:', chunk);
  }
  console.log('Finished reading.');
}

其他要点:

  • 确保资源清理:这在自定义流实现或使用 stream.pipeline 时尤其重要。在错误场景中或不再需要流时显式调用 stream.destroy(),以释放底层资源并防止泄漏。stream.pipeline 为管道流自动处理此问题。
  • 使用 Readable.from() 方便操作:当您需要从现有可迭代对象(例如数组)或异步可迭代对象创建流时,Readable.from() 通常是最简单和最现代的方法,比创建自定义 Readable 类需要更少的样板代码。

结论

流是 Node.js 中的游戏规则改变者,而 TypeScript 通过引入类型安全性和清晰度进一步增强了它们。从读取文件到实时转换数据,掌握流打开了一个高效 I/O 可能性的世界。这里的示例——读取、写入、更改、压缩和通过 HTTP 流式传输——只是可能性的表面。

尝试您自己的管道:尝试流式传输日志、处理 CSV 文件或构建实时聊天系统。您探索得越多,就越会欣赏流的多样性。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计