Node.js Streams with TypeScript
Node.js 以其高效处理 I/O 操作的能力而闻名,而这一能力的核心在于流的概念。流允许您逐块处理数据,而不是一次性将所有内容加载到内存中——非常适合处理大文件、网络请求或实时数据。当您将流与 TypeScript 的强类型结合时,您将获得一个强大的组合:性能与安全性并存。
在本指南中,我们将深入探讨 Node.js 流,探索它们的类型,并通过使用 TypeScript 的实际示例进行讲解。无论您是 Node.js 新手还是希望提升技能的 TypeScript 爱好者,本文都将为您提供所需的内容。
为什么流很重要?
想象一下:您需要处理一个 50GB 的日志文件。将其完全加载到内存中会耗尽服务器的资源,导致崩溃或性能缓慢。流通过让您处理流动的数据来解决这个问题,就像用吸管啜饮而不是猛灌一加仑的壶。
这种效率是流成为 Node.js 基石的原因,为从文件操作到 HTTP 服务器的所有功能提供动力。TypeScript 通过添加类型定义、在编译时捕获错误以及提高代码可读性来增强这一点。让我们深入了解基础知识,看看这种协同作用在实践中是如何工作的。
四种类型的流
Node.js 提供四种主要的流类型,每种都有特定的用途:
- 可读流:您可以从中读取的数据源(例如,文件、HTTP 响应)。
- 可写流:您可以写入的目标(例如,文件、HTTP 请求)。
- 双工流:既可读又可写(例如,TCP 套接字)。
- 转换流:一种特殊的双工流,在数据通过时修改数据(例如,压缩)。
TypeScript 通过允许我们为流经它们的数据定义接口来增强这一点。让我们通过示例来分解它们。
设置您的 TypeScript 环境
在深入代码之前,请确保您已安装 Node.js 和 TypeScript。
创建一个新项目:
1
2
3
4
5
|
mkdir node-streams-typescript
cd node-streams-typescript
npm init -y
npm install typescript @types/node --save-dev
npx tsc --init
|
更新您的 tsconfig.json 以包含:
1
2
3
4
5
6
7
8
9
|
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"strict": true,
"outDir": "./dist"
},
"include": ["src/**/*"]
}
|
创建一个 src 文件夹,然后开始编码!
示例 1:使用可读流读取文件
让我们逐块读取一个文本文件。首先,在项目的根目录中创建一个名为 data.txt 的文件,其中包含一些示例文本(例如,“Hello, streams!”)。
现在,在 src/readStream.ts 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import { createReadStream } from 'fs';
import { Readable } from 'stream';
const readStream: Readable = createReadStream('data.txt', { encoding: 'utf8' });
readStream
.on('data', (chunk: string) => {
console.log('Chunk received:', chunk);
})
.on('end', () => {
console.log('Finished reading the file.');
})
.on('error', (err: Error) => {
console.error('Error:', err.message);
});
|
运行它:
1
|
npx tsc && node dist/readStream.js
|
在这里,TypeScript 确保块符合我们的 Chunk 接口,并且错误事件处理程序期望一个 Error 类型。此流以块(文件默认为 64KB)读取 data.txt 并记录它们。
示例 2:使用可写流写入数据
现在,让我们将数据写入一个新文件。在 src/writeStream.ts 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import { createWriteStream } from 'fs';
import { Writable } from 'stream';
const writeStream: Writable = createWriteStream('output.txt', { encoding: 'utf8' });
const data: string[] = ['Line 1\n', 'Line 2\n', 'Line 3\n'];
data.forEach((line: string) => {
writeStream.write(line);
});
writeStream.end(() => {
console.log('Finished writing to output.txt');
});
writeStream.on('error', (err: Error) => {
console.error('Error:', err.message);
});
|
编译并运行:
1
|
npx tsc && node dist/writeStream.js
|
这将创建包含三行的 output.txt。TypeScript 确保行是字符串,并为流方法提供自动完成。
示例 3:使用转换流进行管道传输
管道是流发挥优势的地方,将可读流连接到可写流。让我们通过一个转换流来增加一个转折,将我们的文本转换为大写。
在 src/transformStream.ts 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import { createReadStream, createWriteStream } from 'fs';
import { Transform, TransformCallback } from 'stream';
// 自定义转换流以将文本转换为大写
class UppercaseTransform extends Transform {
_transform(chunk: Buffer, encoding: string, callback: TransformCallback): void {
const upperChunk = chunk.toString().toUpperCase();
this.push(upperChunk);
callback();
}
}
const readStream = createReadStream('data.txt', { encoding: 'utf8' });
const writeStream = createWriteStream('output_upper.txt');
const transformStream = new UppercaseTransform();
readStream
.pipe(transformStream)
.pipe(writeStream)
.on('finish', () => {
console.log('Transform complete! Check output_upper.txt');
})
.on('error', (err: Error) => {
console.error('Error:', err.message);
});
|
运行它:
1
|
npx tsc && node dist/transformStream.js
|
这将读取 data.txt,将文本转换为大写,并将其写入 output_upper.txt。
TypeScript 的 TransformCallback 类型确保我们的 _transform 方法正确实现。
示例 4:使用双工流压缩文件
让我们处理一个更高级的场景:使用 zlib 模块压缩文件,该模块提供了一个双工流。它随我们之前安装的 @types/node 包一起提供。
在 src/compressStream.ts 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
import { pipeline } from 'stream';
const source = createReadStream('data.txt');
const destination = createWriteStream('data.txt.gz');
const gzip = createGzip();
pipeline(source, gzip, destination, (err: Error | null) => {
if (err) {
console.error('Compression failed:', err.message);
return;
}
console.log('File compressed successfully! Check data.txt.gz');
});
|
运行它:
1
|
npx tsc && node dist/compressStream.js
|
在这里,pipeline 确保适当的错误处理和清理。gzip 流将 data.txt 压缩为 data.txt.gz。TypeScript 的类型推断保持我们的代码简洁和安全。
示例 5:流式传输 HTTP 响应
流在网络操作中表现出色。让我们使用 axios 模拟从 HTTP 服务器流式传输数据。安装它:
1
|
npm install axios @types/axios
|
在 src/httpStream.ts 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import axios from 'axios';
import { createWriteStream } from 'fs';
import { Writable } from 'stream';
async function streamHttpResponse(url: string, outputFile: string): Promise<void> {
const response = await axios({
method: 'get',
url,
responseType: 'stream',
});
const writeStream: Writable = createWriteStream(outputFile);
response.data.pipe(writeStream);
return new Promise((resolve, reject) => {
writeStream.on('finish', () => {
console.log(`Downloaded to ${outputFile}`);
resolve();
});
writeStream.on('error', (err: Error) => {
console.error('Download failed:', err.message);
reject(err);
});
});
}
streamHttpResponse('https://example.com', 'example.html').catch(console.error);
|
运行它:
1
|
npx tsc && node dist/httpStream.js
|
这将流式传输 HTTP 响应(例如,网页)到 example.html。TypeScript 确保 url 和 outputFile 参数是字符串,并且 Promise 类型增加了清晰度。
我们还可以使用 Node.js 的内置 Fetch API(自 Node v18 起可用)或像 node-fetch 这样的库,它们也支持流式响应,尽管流类型可能不同(Web 流与 Node.js 流)。
示例:
1
2
3
|
const response = await fetch('https://example.com');
const writeStream = createWriteStream(outputFile);
response.body.pipe(writeStream);
|
示例 6:使用自定义可读流进行实时数据处理
让我们创建一个自定义的可读流来模拟实时数据,例如传感器读数。在 src/customReadable.ts 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import { Readable } from 'stream';
class SensorStream extends Readable {
private count: number = 0;
private max: number = 10;
constructor(options?: any) {
super(options);
}
_read(): void {
if (this.count < this.max) {
const data = `Sensor reading ${this.count}: ${Math.random() * 100}\n`;
this.push(data);
this.count++;
} else {
this.push(null); // 发出流结束信号
}
}
}
const sensor = new SensorStream({ encoding: 'utf8' });
sensor
.on('data', (chunk: string) => {
console.log('Received:', chunk.trim());
})
.on('end', () => {
console.log('Sensor stream complete.');
})
.on('error', (err: Error) => {
console.error('Error:', err.message);
});
|
运行它:
1
|
npx tsc && node dist/customReadable.js
|
这将生成 10 个随机“传感器读数”并流式传输它们。TypeScript 的类类型确保我们的实现与 Readable 接口对齐。
示例 7:链接多个转换流
让我们链接转换以分阶段处理文本:将其转换为大写,然后添加时间戳。在 src/chainTransform.ts 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import { createReadStream, createWriteStream } from 'fs';
import { Transform, TransformCallback } from 'stream';
class UppercaseTransform extends Transform {
_transform(chunk: Buffer, encoding: string, callback: TransformCallback): void {
this.push(chunk.toString().toUpperCase());
callback();
}
}
class TimestampTransform extends Transform {
_transform(chunk: Buffer, encoding: string, callback: TransformCallback): void {
const timestamp = new Date().toISOString();
this.push(`[${timestamp}] ${chunk.toString()}`);
callback();
}
}
const readStream = createReadStream('data.txt', { encoding: 'utf8' });
const writeStream = createWriteStream('output_chain.txt');
const upper = new UppercaseTransform();
const timestamp = new TimestampTransform();
readStream
.pipe(upper)
.pipe(timestamp)
.pipe(writeStream)
.on('finish', () => {
console.log('Chained transform complete! Check output_chain.txt');
})
.on('error', (err: Error) => {
console.error('Error:', err.message);
});
|
运行它:
1
|
npx tsc && node dist/chainTransform.js
|
这将读取 data.txt,将数据转换为大写,添加时间戳,并将结果写入 output_chain.txt。链接转换展示了流的模块化。
TypeScript 中流的最佳实践
- 类型化您的数据:为块定义接口以尽早捕获类型错误。
- 处理错误:始终附加错误事件监听器以避免未处理的异常。
- 明智地使用管道:管道减少了手动事件处理并提高了可读性。
- 背压:对于大数据,监控
writeStream.writableHighWaterMark 以避免压倒目标。
实际用例:流式传输 API 响应
假设您正在构建一个流式传输大型数据集的 API。使用 express 和流:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import express from 'express';
import { Readable } from 'stream';
const app = express();
app.get('/stream-data', (req, res) => {
const data = ['Item 1\n', 'Item 2\n', 'Item 3\n'];
const stream = Readable.from(data);
res.setHeader('Content-Type', 'text/plain');
stream.pipe(res);
});
app.listen(3000, () => {
console.log('Server running on port 3000');
});
|
安装依赖项(npm install express @types/express),然后运行它。访问 http://localhost:3000/stream-data 以在浏览器中查看数据流!
高级技巧:处理背压
当可写流无法跟上可读流时,会发生背压。Node.js 通过管道自动处理此问题,但您可以手动监控它:
1
2
3
4
5
6
7
8
|
const writeStream = createWriteStream('large_output.txt');
if (!writeStream.write('data')) {
console.log('Backpressure detected! Pausing...');
writeStream.once('drain', () => {
console.log('Resuming...');
});
}
|
这确保您的应用在重负载下保持响应。
使用背压的注意事项:当写入大量数据时,可读流可能产生数据的速度快于可写流消耗的速度。虽然 pipe 和 pipeline 自动处理此问题,但如果手动写入,请检查 write() 是否返回 false,并在写入更多数据之前等待 'drain' 事件。
此外,异步迭代器(for await...of)是使用可读流的现代替代方案,与使用 .on('data') 和 .on('end') 相比,通常可以简化代码。
示例:
1
2
3
4
5
6
|
async function processStream(readable: Readable) {
for await (const chunk of readable) {
console.log('Chunk:', chunk);
}
console.log('Finished reading.');
}
|
其他要点:
- 确保资源清理:这在自定义流实现或使用
stream.pipeline 时尤其重要。在错误场景中或不再需要流时显式调用 stream.destroy(),以释放底层资源并防止泄漏。stream.pipeline 为管道流自动处理此问题。
- 使用
Readable.from() 方便操作:当您需要从现有可迭代对象(例如数组)或异步可迭代对象创建流时,Readable.from() 通常是最简单和最现代的方法,比创建自定义 Readable 类需要更少的样板代码。
结论
流是 Node.js 中的游戏规则改变者,而 TypeScript 通过引入类型安全性和清晰度进一步增强了它们。从读取文件到实时转换数据,掌握流打开了一个高效 I/O 可能性的世界。这里的示例——读取、写入、更改、压缩和通过 HTTP 流式传输——只是可能性的表面。
尝试您自己的管道:尝试流式传输日志、处理 CSV 文件或构建实时聊天系统。您探索得越多,就越会欣赏流的多样性。