如何在Node.js中使用Worker Threads实现多线程编程[完整手册]
JavaScript是一种单线程编程语言,而Node.js是JavaScript的运行时环境。这意味着JavaScript基本上在Node.js中运行,所有操作都是通过单个线程处理的。
但是当我们执行需要大量处理的任务时,Node.js的性能可能会开始下降。许多人错误地认为Node.js不好或者JavaScript有缺陷。但实际上有一个解决方案。JavaScript也可以有效地用于多线程。
在本文中,我们将重点关注后端:具体来说,如何使用Node.js在服务器端实现多线程。
我们将涵盖的内容
- 先决条件
- 使用ExpressJS进行项目设置
- 理解问题
- 理解JavaScript执行
- CPU密集型问题
- 如何实现Worker Threads
- 线程间通信
- 如何使用多核进行优化
- 性能比较
- 总结
先决条件
要跟随本指南并获得最大收益,您应该具备:
- 基本的JavaScript(ES6风格)知识
- 熟悉Node.js基础知识
- 使用Express(或类似框架)的Web服务器基础
- 理解Node.js/JavaScript中的阻塞与非阻塞操作
- 熟悉异步代码(Promises/async/await)和基于事件的处理
- 使用Node.js设置简单的开发环境
使用ExpressJS进行项目设置
在本节中,我们将详细介绍使用Express设置Node.js项目的步骤。
1. 创建新项目文件夹
首先为项目创建一个新文件夹。打开终端或命令提示符并运行:
|
|
2. 初始化Node.js项目
每个Node.js项目都需要一个package.json文件来管理依赖项和脚本。运行:
|
|
3. 安装Express.js
Express是Node.js的轻量级Web框架。使用以下命令安装:
|
|
4. 可选:为开发安装Nodemon
Nodemon在您进行更改时自动重新启动服务器。这在开发过程中非常有用。
|
|
接下来,更新package.json脚本:
|
|
现在您可以使用以下命令启动服务器:
|
|
5. 创建主服务器文件
创建一个名为index.js的文件。这将是应用程序的主要入口点:
|
|
打开index.js并添加以下代码:
|
|
6. 运行项目
使用Nodemon启动服务器:
|
|
或者不使用Nodemon:
|
|
在浏览器中访问这些URL:
- http://localhost:3000/non-blocking 显示简单的非阻塞消息
- http://localhost:3000/blocking 使用Worker Threads执行CPU密集型任务
理解问题
我们已经设置了一个基本的Express.js应用程序,这本质上是一个Node.js应用程序。在此应用程序中,我们定义了两个路由:
- /non-blocking
- /blocking
/non-blocking路由很简单:它只返回一个文本响应,说"This page is non-blocking."
另一方面,/blocking路由包含繁重的计算。它运行一个循环到十亿个数字,计算所有这些数字的总和,然后返回结果。
观察行为
如果您打开浏览器并访问http://localhost:3000/non-blocking URL,它工作得很好并立即响应。
但是如果您访问http://localhost:3000/blocking URL,页面会持续加载并且不会立即响应。
更有趣的是,如果您在/blocking仍在运行时尝试访问http://localhost:3000/non-blocking,它也会变得无响应。
这证明了一个关键概念:当/blocking路由正在执行时,即使是/non-blocking路由也无法响应。换句话说,/blocking中的繁重计算阻塞了Node.js事件循环,影响了所有其他路由。
为什么会发生这种情况?
原因在于Node.js的工作方式。Node.js本质上是一个JavaScript运行时,我们知道,JavaScript是一种单线程编程语言。自然地,Node.js默认也在单线程上运行。
那么,问题出现在哪里?当您执行/blocking路由时,所有JavaScript代码都在主线程上运行。在此期间,主线程完全忙碌或被阻塞。因此,如果另一个用户尝试访问/non-blocking路由,他们将不会得到任何响应,因为主线程仍然忙于先前的任务。
这就是为什么许多人错误地认为JavaScript很弱,因为它是单线程的。但这种看法并不完全准确。通过正确的方法和技术,JavaScript也可以以多线程方式使用,允许您处理繁重的计算而不会阻塞其他操作。
理解JavaScript执行
让我们思考一下JavaScript主要运行的主线程。您可能会问,JavaScript到底在哪里执行?JavaScript在JavaScript引擎内部运行,该引擎负责将JavaScript代码转换为机器代码。
在Node.js的情况下,它在V8引擎上运行,这与Google Chrome中使用的引擎相同。V8引擎完全在单线程上运行,意味着所有JavaScript代码仅在一个主线程内执行。
现在,您可能想知道:除了主线程之外还有其他线程吗?答案是肯定的。除了主线程之外,还有用于处理不同类型任务的额外线程。这些线程的管理和实现由一个名为Libuv的特殊库处理。
Libuv如何工作
Libuv设计用于与V8引擎一起工作。当V8引擎在主线程上执行JavaScript代码时,额外的线程用于处理不同类型的任务。例如,像数据库查询、网络请求或文件读/写任务之类的操作由这些额外线程处理,Libuv库管理并协调它们。
当我们执行此类任务时,它们实际上是在主线程之外的这些额外线程上执行的。Libuv指示V8引擎如何高效处理这些任务。这些任务通常称为输入/输出操作,或简称为I/O操作。换句话说,在执行文件读/写、数据库查询或网络请求时,这些I/O操作在单独的线程上执行,而不会阻塞主线程。
但是如果我们的任务像前面示例中的大型for循环,或者任何主要需要CPU处理的操作,它们不属于I/O操作。在这种情况下,任务必须在主线程上执行,这不可避免地会阻塞它直到任务完成。
Node.js的异步性质
考虑一个场景,客户端向主线程发送请求,此请求需要执行数据库查询。
当用户发送这样的请求时,数据库查询被发送到数据库,但重要的是,它不会阻塞主线程。相反,Libuv在单独的线程上处理数据库查询,保持主线程空闲以处理其他任务。
在这种情况下,如果另一个用户发送不涉及任何数据库查询或I/O操作的请求,它可以立即在主线程上执行。因此,第二个用户收到响应而没有任何延迟。
一旦在单独线程上运行的数据库查询完成,结果将返回到主线程,然后主线程将其作为响应发送回原始用户。这种方法确保用户高效接收输出,并且主线程保持可用于其他任务。
这整个过程代表了JavaScript和Node.js的异步性质。任务不是同步执行的——相反,它们异步运行。一个用户的请求可以在单独的线程上处理,而其他用户继续与服务器无缝交互。这就是Node.js即使在多个同时请求下也能保持高性能和响应性的方式。
CPU密集型问题
所以,这就是一切有效工作的方式。现在的问题是,如果主线程有一个任务不需要用户的任何数据库访问但需要大量CPU处理,会发生什么?在这种情况下,主线程将被阻塞。
假设主线程上的任务消耗大量CPU。如果我们直接在主线程上执行它,事件循环将被阻塞,其他请求将无法处理。
这就是Node.js中worker threads发挥作用的地方。使用worker threads,我们可以在主线程之外启动一个新线程来单独处理CPU繁重的操作。因此,主线程保持空闲,允许其他请求立即处理。
换句话说,通过使用worker threads,我们可以异步运行CPU密集型任务,确保服务器的吞吐量和响应性不受影响。
如何实现Worker Threads
如果我们查看之前的index.js文件,/blocking路由处理程序中的任务完全在主线程上运行,这就是它导致阻塞的原因。那么,我们如何解决这个问题?解决方案是使用Node.js的内置worker threads模块。
不需要安装任何外部包,因为worker threads是Node.js的核心模块。我们可以直接从worker_threads模块require Worker类并创建一个新的worker thread。
|
|
工作原理:
- 在/blocking路由处理程序内部,我们使用new Worker()创建一个新的worker并提供文件路径
- 此文件(worker.js)包含我们希望worker执行的CPU繁重任务
- 例如,我们繁重的for循环被移到此单独文件中
我们创建一个名为worker.js的新文件并将循环粘贴在那里:
|
|
当我们创建Worker时传递worker.js的路径,Node.js启动一个新线程。
这个新线程独立执行CPU密集型任务,保持主线程空闲以处理其他传入请求。
通过这样做,应用程序变得更具响应性,并且可以处理多个请求而不会阻塞。
线程间通信
在Node.js中,我们有主线程和额外的worker threads。为了在它们之间协调任务,我们可以使用消息系统。本质上,所有结果最终都需要到达主线程。否则,我们将无法向用户提供任何输出。
例如,假设您将任务分配给线程B,将另一个任务分配给线程C。当这些线程完成它们的任务时,它们必须通知主线程。它们通过消息系统发送消息来做到这一点。
将其想象成在收件箱中交换消息:线程C一旦完成任务就直接向主线程发送消息。通过这种通信,worker threads通知主线程任务完成并发送任何必要的数据。
这正是我们将在示例中使用的机制,通过worker threads处理CPU繁重任务,确保主线程保持空闲和响应。
设置Worker通信
所以,我们创建了一个worker.js文件。现在的问题是,我们如何通知主线程在此文件中完成的任务?
为了实现这一点,我们从Node.js中的内置worker_threads模块提取parentPort。parentPort是一个特殊对象,允许worker thread和主线程之间的通信。它充当桥梁:每当worker完成任务时,它可以通过此通道将结果发送回主线程。
任务完成后,我们使用parentPort.postMessage(result)方法发送最终数据。换句话说,我们向父线程发布消息,在我们的情况下,该消息是我们循环的计算结果。
这是worker.js文件的完整代码:
|
|
在这个例子中:
- 我们从worker_threads导入parentPort
- 我们执行繁重任务——一个计数到100亿的循环
- 完成循环后,我们使用parentPort.postMessage(result)将结果发送回主线程
这就是Node.js中worker thread和主线程之间通信的方式。
现在的问题是,一旦我们从worker发送数据,如何在index.js文件的/blocking处理程序中接收它?
为此,我们需要在处理程序内部设置一个监听器。为此,我们使用worker.on()方法。
那么,我们到底在监听什么?我们监听"message"事件——就像我们在JavaScript中监听onClick或其他事件一样。
worker.on()的第一个参数是事件名称(“message”),第二个参数是回调函数。在该回调内部,第一个参数代表我们从worker接收的数据。
一旦我们收到数据,我们可以使用以下命令将其发送回浏览器作为响应:
|
|
解释:
- worker.on(“message”, callback)监听使用parentPort.postMessage()从worker thread发送的消息
- data参数包含worker发送的结果
- 使用res.status(200).send(…),我们将计算结果发送回浏览器
这允许繁重计算在单独线程中发生,保持主线程空闲和响应。
同时,我们也应该处理可能的错误。
如果worker内部发生任何错误,我们可以以相同方式使用"error"事件监听它:
|
|
解释:
- worker.on(“error”, callback)专门监听worker thread内的错误
- err参数包含worker中出错的详细信息
- 使用res.status(400).send(…),我们将错误返回给客户端,这样请求就不会静默挂起
这是完整代码的样子:
|
|
一旦设置完成,您将看到戏剧性的变化。/blocking路由正在加载,但即使在加载时,重复刷新/non-blocking路由也完美工作,没有任何问题!
现在注意,/non-blocking路由是可访问的,这意味着即使/blocking路由仍在运行,它也不会影响任何东西。所以,我们已经成功解决了这个问题。我们将主要任务移到了主线程之外的单独线程。这意味着什么?主线程创建了一个新的worker thread并将CPU繁重任务分配给它。新线程现在独立工作,而主线程保持空闲。
最后,当新线程完成任务时,它也变为空闲。然后,通过消息系统,新线程通知主线程,“您的数据已准备好,这是您的数据。“主线程接收此数据并将其作为响应发送给客户端。
因此,为数据库查询或文件读写操作自动在单独线程上处理的任务——因为它们是I/O操作——我们现在已经手动启动了一个线程并使用它来处理类似的CPU繁重任务。
如何使用多核进行优化
既然您清楚地了解了过程的工作原理,让我们更进一步,使用多个CPU核心进行优化。
当您访问/blocking路由时,您可能会注意到它仍然需要相当长的时间来响应。这表明优化尚未完全完成。到目前为止,我们使用了一个单独的线程,意味着我们在主线程之外使用了一个CPU核心。但大多数现代机器有多个核心,我们可以利用这一点来提高性能。
检查系统有多少核心
在分配多个核心之前,您可以检查系统上有多少可用核心:
-
macOS(基于Unix):
1sysctl -n hw.ncpu此命令返回机器上的CPU核心总数。例如,在我的Mac上,它显示10,意味着我有十个可用核心。
-
Linux:
1nproc这将打印可用的处理单元数量。
-
Windows(命令提示符):
1echo %NUMBER_OF_PROCESSORS%
这些命令中的每一个都将帮助您确定可以用于并行处理的核心数量。
利用多个核心进行更快执行
一旦知道机器有多少核心,您可以决定为特定作业分配多少核心。例如,由于我的系统有十个核心,我可能选择为任务使用四个核心。
通过将工作负载分布在多个线程(每个线程在自己的核心上运行)上,您可以实现显著的性能改进。不仅仅依赖一个核心,系统可以同时执行任务的多个部分, dramatically减少总执行时间。
简而言之,您有效利用的核心越多,计算繁重任务完成的速度越快(只要您的代码设计为安全处理并行执行)。
如何实现多核优化
现在,我们将通过使用多个worker threads优化/blocking任务。首先,我们将创建现有文件的副本:
- index.js → index-optimized.js
- worker.js → worker-optimized.js
我们计划使用四个线程。即使机器可能有更多核心,使用所有核心可能会使系统过载,因此我们将其限制为四个。
index-optimize.js:
|
|
在这里,我们创建了一个返回Promise的createWorker函数。在它内部,创建了worker,并处理了message和error事件。在/blocking路由中,我们异步创建多个workers,使用Promise.all等待所有workers完成,然后对结果求和。
worker-optimize.js:
|
|
每个worker从主线程接收thread_count并计算其部分任务。完成后,它使用parentPort.postMessage发送结果回来。这样,繁重计算被分布,主线程保持空闲。
逐行理解代码
好吧,其中一些概念起初可能看起来有点复杂。但别担心!我们将逐行讲解所有代码,详细解释一切,以便您准确理解正在发生什么以及为什么。
线程规划和配置
现在,谈到重点我们将使用线程,对吧?我们计划使用多个线程。假设我们决定使用四个线程。我们的机器有十个核心,但我们不会全部使用,因为那将消耗我们所有的系统资源。所以,我们将使用来自四个可用核心的四个线程。
因此,在index-optimized.js文件中,我们创建了一个常量来存储我们将使用的线程数。假设我们在这里设置为4,以便以后其他开发人员可以根据需要轻松更改它。
createWorker函数
然后,我们创建了一个名为createWorker的新函数。此函数的目的是创建一个新的Worker。在这里,我们返回一个promise,因为创建Worker的过程是异步执行的。
这是因为当我们创建四个workers时,我们希望创建过程本身异步发生,这样主线程就不会被阻塞。毕竟,创建worker本质上是一个单独的过程。
最佳实践是异步创建workers。这就是为什么我们创建了createWorker函数,它返回一个promise。如我们所知,事件在promise内部被监听,其中使用resolve和reject。在/blocking处理程序中,我们可以通过此promise处理worker的结果或任何错误。
创建Worker
要创建worker,我们使用:
|
|
在这里,我们需要提供Worker文件的路径。然后,作为第二个参数,我们可以传递一些选项。例如,如果我们想向Worker发送一些数据,我们使用{ workerData }。在此workerData内部,我们将发送THREAD_COUNT,它存储在我们的文件中作为THREAD_COUNT。
例如,我们可以在workerData中传递一个对象,如:
|
|
当这个Worker被创建时,我们从index-optimized.js发送一些属性作为workerData。这是因为在worker-optimized.js中,worker可以使用parentPort知道它应该使用多少线程。所以,我们在workerData中包含了一个threadCount属性。当worker启动时,它从workerData读取threadCount并相应工作。这就是我们设计createWorker函数的方式,它简单地返回一个Promise。
事件处理和Promise结构
在这里,与我们原始的index.js文件相比,我们做了一个重要的更改。
由于我们将所有代码从index.js复制到index-optimized.js中,我们调整了/blocking路由处理程序。具体来说,我们从/blocking处理程序中删除了Worker的直接创建。相反,Worker现在在createWorker函数内部创建。
此外,之前在/blocking处理程序内部的所有事件监听器(message和error)也已移入createWorker函数。这意味着worker在函数内部完全管理,/blocking处理程序现在只处理promise结果,保持主线程清洁和有组织。
但是由于这些事件在promise内部被监听,我们无法直接从那里发送响应。我们将在/blocking处理程序内部发送响应。所以从Promise中,我们只使用resolve和reject。
例如:
|
|
换句话说,创建worker的整个过程已移入createWorker函数,该函数最终返回一个promise。
跨多个Workers分配工作
现在,在/blocking处理程序内部,我简单地调用createWorker函数。我们提供的workerData告诉worker它应该执行什么任务。创建的worker与worker-optimized.js文件中的parentPort链接,这基本上与父线程通信。
现在,我们希望在四个核心上分配运行到一百万的for循环。要使用的核心数量从index-optimized.js作为workerData的一部分发送。因为这些信息在workerData中,workers可以自动在它们之间分配和处理任务。
所以,在worker-optimized.js文件中,我们将使用以下命令获取workerData:
|
|
然后,在for-loop条件中,我们将使用workerData.threadCount。这意味着从index-optimized.js发送的threadCount将在这里使用,而不是硬编码4。这是最佳实践,因为数据在创建时传递给worker。在worker-optimized.js中,我们使用它将工作分成四个部分。然后,将创建四个workers,意味着createWorker函数将被调用四次。每个worker将承担一部分工作,最后,所有结果将被合并。这就是整个过程完成的方式。
所以,在这个/blocking处理程序中,我们的任务是收集四个promises的结果,然后对它们全部求和。假设我们将它们存储在名为workerPromises的数组中。此数组中的每个条目将保存一个worker的promise结果。然后,通过组合所有结果,我们得到最终结果。
由于我们需要创建四个Workers,我们将运行一个for-loop:for (let i = 0; i < THREAD_COUNT; i++)。在这个循环的主体内部,我们每次调用createWorker函数。这意味着在每次迭代中,都会创建一个新的worker,其promise被推入workerPromises数组。
所以,在这个循环的主体内部,我们将调用createWorker函数四次。每次调用createWorker返回一个promise。这四个promises被推入workerPromises数组,如workerPromises.push(createWorker())。这样,每个worker都有自己的promise。最后,由于所有promises都存储在workerPromises数组中,我们可以轻松调用Promise.all(workerPromises)。
所以,我们使用了threadResults = await Promise.all(workerPromises)。如我们所知,Promise.all可以一起处理多个Promises。在这里,我们传递了workerPromises数组,所以threadResults将包含四个promises的结果作为单独的元素,如threadResults[0]、threadResults[1]、threadResults[2]和threadResults[3]。然后,我们对这些结果求和以获得总计算,意味着threadResults[0] + threadResults[1] + threadResults[2] + threadResults[3]给出最终结果。由于我们使用了await,整个函数需要是async。
一旦一切正确完成,我们可以使用res.status(200).send(Result is ${total})将此总结果发送给客户端。这样,总计算正确工作,与以前不同。
所以,我希望现在清楚了:我们在这里调用了createWorker函数四次。每次调用返回一个promise。然后我们使用Promise.all一起等待所有这些promises,所以所有结果一次到达。之后,我们对这些结果求和。/blocking处理程序本质上是执行我们操作工作的那个。
处理复杂任务
所以,在worker-optimized.js文件中,我们基本上将工作分成了四个部分。但任务并不总是for-loop。也可能有不同类型的复杂任务,如图像处理、数据处理或分页。
在这种情况下,我们不能总是遵循相同的模式。所以,我们需要从index-optimized.js发送必要的数据作为workerData,worker将使用该数据在单独进程中执行任务。
在前面的例子中,所有步骤都是顺序的,所以简单地求和结果就给了我们总数。但在复杂任务的情况下,我们需要使用数据驱动的处理。
在其他复杂的应用程序中,您可能需要执行不同的任务。但主要概念很清楚:我们从这里发送的任何数据或属性将被worker接收,然后worker将分配工作。每个worker——无论您使用四个、五个还是六个——将处理其部分,所有结果都需要累积。这本质上是整个过程。
性能比较
在Node.js中处理CPU密集型任务时,使用worker threads分配工作可以显著提高性能。让我们比较优化前后应用程序的行为。
测试结果
运行index.js文件并在浏览器中点击/blocking路由需要相当长的时间。
运行index-optimized.js文件并点击相同路由需要相当少的时间——大约3秒。
停止它并再次运行index.js清楚地显示原始实现较慢。
性能指标
| 文件 | 路由 | 大约响应时间 | 备注 |
|---|---|---|---|
| index.js | /blocking | 长得多 | 这是原始实现。单线程循环阻塞事件循环,导致延迟。 |
| index-optimized.js | /blocking | 大约3秒 | 这里,工作被分配到多个worker threads,使过程快得多。 |
关键要点
这个比较证明了使用worker threads将工作分成多个部分如何使CPU密集型任务更高效,保持主线程响应并提高整体性能。
总结
所以,首先我们在index.js中看到了如何以非阻塞、异步的方式处理阻塞任务。也就是说,我们运行了一个worker thread,由于这个worker thread,主线程没有被阻塞,允许其他用户同时继续他们的任务。
多核挑战
但问题是,当我们在服务器上使用新线程时,不仅仅有单个核心。通常,有多个核心,如8、16或更多。要使用多个核心,我们首先需要找出服务器上有多少可用核心。
发现可用核心
如果服务器是Linux,我们可以使用nproc命令轻松找出核心总数。然后我们可以决定使用多少核心。例如,假设我们决定使用三个核心。在index-optimized.js中,我们实现了一种在这些核心之间分配工作的方式。
异步Worker创建
所以,我们所做的是将worker创建过程包装在promise中。由于创建worker需要一些时间并且启动它不是瞬时的,这个过程是异步完成的。这样,即使多个用户点击端点创建Workers,主线程也不会被阻塞。
如何实现多核优化
我们简单地创建了workers,然后在循环内部使用createWorker函数,我们生成了四个或指定数量的Workers基于线程数。每个worker独立发布消息,通过监听器,我们从每个worker接收数据。这些结果通过promises收集,一起存储在数组中,最后,我们对此数组中的所有结果求和以获得最终结果。
所以,其他概念都是基本JavaScript的一部分。我希望您现在理解worker threads如何工作以及我们如何在Node.js中使用多线程进程。这是一个优秀的概念和彻底学习的好机会。
我们学到了什么
Node.js中的Worker Threads提供了一种强大的方式来处理CPU密集型任务而不会阻塞主事件循环。通过利用多个核心并将工作分布在线程上,我们可以显著提高应用程序性能,同时保持对其他用户的响应性。
- 非阻塞执行:Worker threads防止主线程被阻塞
- 多核利用:我们可以利用多个CPU核心进行并行处理
- 异步worker创建:使用promises处理worker创建而不阻塞
- 结果聚合:从多个workers收集和组合结果
- 性能优化:跨多个线程分配繁重计算
这种方法对于需要处理计算密集型任务同时保持对用户请求响应的应用程序特别有价值。
最后的话
如果您发现这里的信息有价值,请随时与可能从中受益的其他人分享。我真的很感谢您的想法——在X @sumit_analyzen或Facebook @sumit.analyzen上提到我,观看我的编码教程,访问我的网站或简单地在LinkedIn上与我联系。
额外资源
您还可以查看Node.js Worker Threads文档以进行更深入的学习。您可以在GitHub存储库中找到本教程的所有源代码。如果它以任何方式帮助了您,请考虑给它一个星标以表示您的支持!