集群
源代码: lib/cluster.js
Node.js 进程集群可用于运行多个 Node.js 实例,这些实例可以在其应用程序线程之间分配工作负载。如果不需要进程隔离,请改用 worker_threads
模块,该模块允许在一个 Node.js 实例中运行多个应用程序线程。
cluster 模块允许轻松创建共享服务器端口的子进程。
import cluster from 'node:cluster'
import http from 'node:http'
import { availableParallelism } from 'node:os'
import process from 'node:process'
const numCPUs = availableParallelism()
if (cluster.isPrimary) {
console.log(`主进程 ${process.pid} 正在运行`)
// 分叉工作进程。
for (let i = 0; i < numCPUs; i++) {
cluster.fork()
}
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 终止`)
})
} else {
// 工作进程可以共享任何 TCP 连接
// 在这种情况下,它是一个 HTTP 服务器
http
.createServer((req, res) => {
res.writeHead(200)
res.end('hello world\n')
})
.listen(8000)
console.log(`工作进程 ${process.pid} 已启动`)
}
const cluster = require('node:cluster')
const http = require('node:http')
const numCPUs = require('node:os').availableParallelism()
const process = require('node:process')
if (cluster.isPrimary) {
console.log(`主进程 ${process.pid} 正在运行`)
// 分叉工作进程。
for (let i = 0; i < numCPUs; i++) {
cluster.fork()
}
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 终止`)
})
} else {
// 工作进程可以共享任何 TCP 连接
// 在这种情况下,它是一个 HTTP 服务器
http
.createServer((req, res) => {
res.writeHead(200)
res.end('hello world\n')
})
.listen(8000)
console.log(`工作进程 ${process.pid} 已启动`)
}
现在运行 Node.js 将在工作进程之间共享端口 8000:
$ node server.js
主进程 3596 正在运行
工作进程 4324 已启动
工作进程 4520 已启动
工作进程 6056 已启动
工作进程 5644 已启动
在 Windows 上,目前尚无法在工作进程中设置命名管道服务器。
工作原理
工作进程使用 child_process.fork()
方法生成,以便它们可以通过 IPC 与父进程通信并来回传递服务器句柄。
集群模块支持两种分发传入连接的方法。
第一种方法(也是除 Windows 外所有平台上的默认方法)是循环法,其中主进程监听一个端口,接受新的连接并将它们以循环方式分发到各个工作进程,并具有一些内置的智能功能来避免过载工作进程。
第二种方法是主进程创建监听套接字并将其发送给感兴趣的工作进程。然后,工作进程直接接受传入的连接。
理论上,第二种方法应该提供最佳性能。然而,在实践中,由于操作系统调度程序的变幻莫测,分布往往非常不平衡。在观察到的负载中,总共八个进程中,超过 70% 的连接最终都集中在两个进程中。
因为 server.listen()
将大部分工作交给主进程,所以正常 Node.js 进程和集群工作进程之间的行为存在三种差异:
Node.js 不提供路由逻辑。因此,务必设计一个应用程序,使其不依赖于内存中的数据对象来处理会话和登录等内容。
因为工作进程都是单独的进程,所以可以根据程序的需要杀死或重新生成它们,而不会影响其他工作进程。只要有一些工作进程仍然存活,服务器将继续接受连接。如果没有任何工作进程存活,则现有连接将被断开,新连接将被拒绝。但是,Node.js 不会自动管理工作进程的数量。管理基于自身需求的工作进程池是应用程序的责任。
虽然 node:cluster
模块的主要用例是网络,但它也可以用于需要工作进程的其他用例。
类: Worker
新增于: v0.7.0
- 继承自: <EventEmitter>
Worker
对象包含关于工作进程的所有公共信息和方法。在主进程中,它可以通过 cluster.workers
获取。在工作进程中,它可以通过 cluster.worker
获取。
事件: 'disconnect'
新增于: v0.7.7
类似于 cluster.on('disconnect')
事件,但特定于此工作进程。
cluster.fork().on('disconnect', () => {
// 工作进程已断开连接
})
事件: 'error'
新增于: v0.7.3
此事件与 child_process.fork()
提供的事件相同。
在工作进程中,也可以使用 process.on('error')
。
事件: 'exit'
新增于: v0.11.2
类似于 cluster.on('exit')
事件,但特定于此工作进程。
import cluster from 'node:cluster'
if (cluster.isPrimary) {
const worker = cluster.fork()
worker.on('exit', (code, signal) => {
if (signal) {
console.log(`工作进程被信号终止: ${signal}`)
} else if (code !== 0) {
console.log(`工作进程以错误代码退出: ${code}`)
} else {
console.log('工作进程成功!')
}
})
}
const cluster = require('node:cluster')
if (cluster.isPrimary) {
const worker = cluster.fork()
worker.on('exit', (code, signal) => {
if (signal) {
console.log(`工作进程被信号终止: ${signal}`)
} else if (code !== 0) {
console.log(`工作进程以错误代码退出: ${code}`)
} else {
console.log('工作进程成功!')
}
})
}
事件: 'listening'
新增于: v0.7.0
address
<Object>
类似于 cluster.on('listening')
事件,但特定于此工作进程。
cluster.fork().on('listening', address => {
// 工作进程正在监听
})
cluster.fork().on('listening', address => {
// 工作进程正在监听
})
它不会在工作进程中发出。
事件: 'message'
新增于: v0.7.0
message
<Object>handle
<undefined> | <Object>
类似于 cluster
的 'message'
事件,但特定于此工作进程。
在工作进程中,也可以使用 process.on('message')
。
这是一个使用消息系统的示例。它在主进程中跟踪工作进程接收到的 HTTP 请求数量:
import cluster from 'node:cluster'
import http from 'node:http'
import { availableParallelism } from 'node:os'
import process from 'node:process'
if (cluster.isPrimary) {
// 跟踪 http 请求
let numReqs = 0
setInterval(() => {
console.log(`numReqs = ${numReqs}`)
}, 1000)
// 计数请求
function messageHandler(msg) {
if (msg.cmd && msg.cmd === 'notifyRequest') {
numReqs += 1
}
}
// 启动工作进程并监听包含 notifyRequest 的消息
const numCPUs = availableParallelism()
for (let i = 0; i < numCPUs; i++) {
cluster.fork()
}
for (const id in cluster.workers) {
cluster.workers[id].on('message', messageHandler)
}
} else {
// 工作进程具有一个 http 服务器。
http
.Server((req, res) => {
res.writeHead(200)
res.end('hello world\n')
// 通知主进程请求
process.send({ cmd: 'notifyRequest' })
})
.listen(8000)
}
const cluster = require('node:cluster')
const http = require('node:http')
const numCPUs = require('node:os').availableParallelism()
const process = require('node:process')
if (cluster.isPrimary) {
// 跟踪 http 请求
let numReqs = 0
setInterval(() => {
console.log(`numReqs = ${numReqs}`)
}, 1000)
// 计数请求
function messageHandler(msg) {
if (msg.cmd && msg.cmd === 'notifyRequest') {
numReqs += 1
}
}
// 启动工作进程并监听包含 notifyRequest 的消息
for (let i = 0; i < numCPUs; i++) {
cluster.fork()
}
for (const id in cluster.workers) {
cluster.workers[id].on('message', messageHandler)
}
} else {
// 工作进程具有一个 http 服务器。
http
.Server((req, res) => {
res.writeHead(200)
res.end('hello world\n')
// 通知主进程请求
process.send({ cmd: 'notifyRequest' })
})
.listen(8000)
}
事件:'online'
新增于:v0.7.0
类似于 cluster.on('online')
事件,但特定于此工作进程。
cluster.fork().on('online', () => {
// 工作进程已上线
})
此事件不会在工作进程中发出。
worker.disconnect()
[历史]
版本 | 变更 |
---|---|
v7.3.0 | 此方法现在返回 worker 的引用。 |
v0.7.7 | 新增于:v0.7.7 |
- 返回值:<cluster.Worker>
worker
的引用。
在工作进程中,此函数将关闭所有服务器,等待这些服务器上的 'close'
事件,然后断开 IPC 通道。
在主进程中,会向工作进程发送一条内部消息,使其自行调用 .disconnect()
。
导致 .exitedAfterDisconnect
被设置。
服务器关闭后,将不再接受新连接,但任何其他正在监听的工作进程都可能接受连接。现有连接将按常例关闭。当不再存在连接时(参见 server.close()
),与工作进程的 IPC 通道将关闭,允许其优雅地退出。
以上仅适用于服务器连接,客户端连接不会被工作进程自动关闭,并且 disconnect 不会等待它们关闭后再退出。
在工作进程中,存在 process.disconnect
,但它不是此函数;它是 disconnect()
。
由于长期存在的服务器连接可能会阻止工作进程断开连接,因此发送消息可能很有用,以便可以采取特定于应用程序的操作来关闭它们。实现超时也可能很有用,如果在一段时间后未发出 'disconnect'
事件,则会杀死工作进程。
if (cluster.isPrimary) {
const worker = cluster.fork()
let timeout
worker.on('listening', address => {
worker.send('shutdown')
worker.disconnect()
timeout = setTimeout(() => {
worker.kill()
}, 2000)
})
worker.on('disconnect', () => {
clearTimeout(timeout)
})
} else if (cluster.isWorker) {
const net = require('node:net')
const server = net.createServer(socket => {
// 连接永不结束
})
server.listen(8000)
process.on('message', msg => {
if (msg === 'shutdown') {
// 启动服务器任何连接的优雅关闭
}
})
}
worker.exitedAfterDisconnect
新增于:v6.0.0
如果 worker 进程由于 .disconnect()
而退出,则此属性为 true
。如果 worker 进程以其他任何方式退出,则为 false
。如果 worker 进程尚未退出,则为 undefined
。
布尔值 worker.exitedAfterDisconnect
允许区分自愿退出和意外退出,主进程可以根据此值选择是否重新生成 worker 进程。
cluster.on('exit', (worker, code, signal) => {
if (worker.exitedAfterDisconnect === true) {
console.log('哦,这只是自愿的——不用担心')
}
})
// 终止 worker 进程
worker.kill()
worker.id
新增于:v0.8.0
每个新的 worker 进程都会获得其自己的唯一 ID,此 ID 存储在 id
中。
当 worker 进程处于活动状态时,这是在 cluster.workers
中索引它的键。
worker.isConnected()
新增于:v0.11.14
如果工作进程通过其 IPC 通道连接到主进程,则此函数返回 true
,否则返回 false
。工作进程在其创建后连接到主进程。在发出 'disconnect'
事件后,它将断开连接。
worker.isDead()
新增于:v0.11.14
如果工作进程已终止(由于退出或被信号终止),则此函数返回 true
。否则,返回 false
。
import cluster from 'node:cluster'
import http from 'node:http'
import { availableParallelism } from 'node:os'
import process from 'node:process'
const numCPUs = availableParallelism()
if (cluster.isPrimary) {
console.log(`主进程 ${process.pid} 正在运行`)
// 派生工作进程。
for (let i = 0; i < numCPUs; i++) {
cluster.fork()
}
cluster.on('fork', worker => {
console.log('工作进程已终止:', worker.isDead())
})
cluster.on('exit', (worker, code, signal) => {
console.log('工作进程已终止:', worker.isDead())
})
} else {
// 工作进程可以共享任何 TCP 连接。在本例中,它是一个 HTTP 服务器。
http
.createServer((req, res) => {
res.writeHead(200)
res.end(`当前进程\n ${process.pid}`)
process.kill(process.pid)
})
.listen(8000)
}
const cluster = require('node:cluster')
const http = require('node:http')
const numCPUs = require('node:os').availableParallelism()
const process = require('node:process')
if (cluster.isPrimary) {
console.log(`主进程 ${process.pid} 正在运行`)
// 派生工作进程。
for (let i = 0; i < numCPUs; i++) {
cluster.fork()
}
cluster.on('fork', worker => {
console.log('工作进程已终止:', worker.isDead())
})
cluster.on('exit', (worker, code, signal) => {
console.log('工作进程已终止:', worker.isDead())
})
} else {
// 工作进程可以共享任何 TCP 连接。在本例中,它是一个 HTTP 服务器。
http
.createServer((req, res) => {
res.writeHead(200)
res.end(`当前进程\n ${process.pid}`)
process.kill(process.pid)
})
.listen(8000)
}
worker.kill([signal])
新增于: v0.9.12
signal
<string> 发送到工作进程的终止信号名称。默认值:'SIGTERM'
此函数将终止工作进程。在主工作进程中,它通过断开 worker.process
的连接来实现,断开连接后,使用 signal
进行终止。在工作进程中,它通过使用 signal
终止进程来实现。
kill()
函数会终止工作进程,而不会等待优雅断开连接,它的行为与 worker.process.kill()
相同。
出于向后兼容性考虑,此方法被用作 worker.destroy()
的别名。
在工作进程中,存在 process.kill()
,但它并非此函数;它是 kill()
。
worker.process
新增于: v0.7.0
所有工作进程都是使用 child_process.fork()
创建的,此函数返回的对象存储为 .process
。在工作进程中,存储的是全局 process
。
参见:子进程模块。
如果 process
上发生 'disconnect'
事件且 .exitedAfterDisconnect
不是 true
,则工作进程将调用 process.exit(0)
。这可以防止意外断开连接。
worker.send(message[, sendHandle[, options]][, callback])
[历史]
版本 | 变更 |
---|---|
v4.0.0 | 现在支持 callback 参数。 |
v0.7.0 | v0.7.0 版本中添加 |
message
<对象>sendHandle
<句柄>options
<对象> 如果存在,options
参数是一个对象,用于参数化某些类型句柄的发送。options
支持以下属性:keepOpen
<布尔值> 在传递net.Socket
实例时可用的值。当值为true
时,套接字在发送进程中保持打开状态。默认值:false
。
callback
<函数>返回值: <布尔值>
向工作进程或主进程发送消息,可以选择性地包含句柄。
在主进程中,这会将消息发送到特定工作进程。它与 ChildProcess.send()
相同。
在工作进程中,这会将消息发送到主进程。它与 process.send()
相同。
此示例将回显来自主进程的所有消息:
if (cluster.isPrimary) {
const worker = cluster.fork()
worker.send('hi there')
} else if (cluster.isWorker) {
process.on('message', msg => {
process.send(msg)
})
}
事件:'disconnect'
新增于:v0.7.9
worker
<cluster.Worker>
当工作进程 IPC 通道断开连接后发出此事件。这可能发生在工作进程优雅退出、被杀死或手动断开连接(例如使用 worker.disconnect()
)时。
'disconnect'
和 'exit'
事件之间可能存在延迟。这些事件可用于检测进程是否卡在清理过程中或是否存在长期连接。
cluster.on('disconnect', worker => {
console.log(`工作进程 #${worker.id} 已断开连接`)
})
事件:'exit'
新增于:v0.7.9
worker
<cluster.Worker>code
<number> 退出代码,如果正常退出则存在。signal
<string> 导致进程被杀死的信号名称(例如'SIGHUP'
)。
当任何工作进程死亡时,cluster
模块将发出 'exit'
事件。
这可用于通过再次调用 .fork()
来重新启动工作进程。
cluster.on('exit', (worker, code, signal) => {
console.log('工作进程 %d 死亡 (%s)。正在重新启动...', worker.process.pid, signal || code)
cluster.fork()
})
事件: 'fork'
新增于: v0.7.0
worker
<cluster.Worker>
当一个新的工作进程被派生时,cluster
模块将发出一个 'fork'
事件。这可以用来记录工作进程活动,并创建自定义超时。
const timeouts = []
function errorMsg() {
console.error('Something must be wrong with the connection ...')
}
cluster.on('fork', worker => {
timeouts[worker.id] = setTimeout(errorMsg, 2000)
})
cluster.on('listening', (worker, address) => {
clearTimeout(timeouts[worker.id])
})
cluster.on('exit', (worker, code, signal) => {
clearTimeout(timeouts[worker.id])
errorMsg()
})
事件: 'listening'
新增于: v0.7.0
worker
<cluster.Worker>address
<Object>
在工作进程中调用 listen()
后,当服务器上发出 'listening'
事件时,主进程中的 cluster
也将发出 'listening'
事件。
事件处理程序使用两个参数执行,worker
包含工作进程对象,address
对象包含以下连接属性:address
、port
和 addressType
。如果工作进程监听多个地址,这将非常有用。
cluster.on('listening', (worker, address) => {
console.log(`A worker is now connected to ${address.address}:${address.port}`)
})
addressType
的值为:
4
(TCPv4)6
(TCPv6)-1
(Unix 域套接字)'udp4'
或'udp6'
(UDPv4 或 UDPv6)
事件:'message'
[历史]
版本 | 变更 |
---|---|
v6.0.0 | 现在传递 worker 参数;详情见下文。 |
v2.5.0 | 新增于:v2.5.0 |
worker
<cluster.Worker>message
<Object>handle
<undefined> | <Object>
当集群主进程从任何子进程接收消息时发出。
参见 child_process
事件:'message'
。
事件:'online'
新增于:v0.7.0
worker
<cluster.Worker>
分叉一个新的子进程后,子进程应该用在线消息进行响应。当主进程接收到在线消息时,它将发出此事件。'fork'
和 'online'
之间的区别在于,fork
在主进程分叉子进程时发出,而 'online'
在子进程运行时发出。
cluster.on('online', worker => {
console.log('Yay, the worker responded after it was forked')
})
事件: 'setup'
新增于: v0.7.1
settings
<对象>
每次调用 .setupPrimary()
时都会发出此事件。
settings
对象是在调用 .setupPrimary()
时 cluster.settings
对象的副本,仅供参考,因为在单个 tick 中可以多次调用 .setupPrimary()
。
如果需要精确的值,请使用 cluster.settings
。
cluster.disconnect([callback])
新增于: v0.7.7
callback
<函数> 所有 worker 断开连接且句柄关闭时调用。
对 cluster.workers
中的每个 worker 调用 .disconnect()
。
断开连接后,所有内部句柄都将关闭,如果没有任何其他事件等待,则允许主进程优雅地退出。
此方法接受一个可选的回调参数,该参数在完成时将被调用。
此方法只能从主进程调用。
cluster.fork([env])
新增于:v0.6.0
env
<Object> 添加到 worker 进程环境的键值对。- 返回值: <cluster.Worker>
生成一个新的 worker 进程。
这只能在主进程中调用。
cluster.isMaster
新增于:v0.8.1
自 v16.0.0 起已弃用
[
cluster.isPrimary](/zh/api/cluster#clusterisprimary)
的已弃用别名。
cluster.isPrimary
新增于:v16.0.0
如果进程为主进程则为真。这由 process.env.NODE_UNIQUE_ID
确定。如果 process.env.NODE_UNIQUE_ID
未定义,则 isPrimary
为 true
。
cluster.isWorker
新增于:v0.6.0
如果进程不是主进程则为真(它是 cluster.isPrimary
的否定)。
cluster.schedulingPolicy
新增于:v0.11.2
调度策略,可以是 cluster.SCHED_RR
(轮询)或 cluster.SCHED_NONE
(留给操作系统处理)。这是一个全局设置,一旦第一个工作进程生成或调用了 .setupPrimary()
(以先发生者为准),就会被冻结。
SCHED_RR
是除 Windows 外所有操作系统的默认值。一旦 libuv 能够有效地分发 IOCP 句柄而不会造成较大的性能损失,Windows 将切换到 SCHED_RR
。
cluster.schedulingPolicy
也可以通过 NODE_CLUSTER_SCHED_POLICY
环境变量设置。有效值为 'rr'
和 'none'
。
cluster.settings
[历史记录]
版本 | 变更 |
---|---|
v13.2.0, v12.16.0 | 现在支持 serialization 选项。 |
v9.5.0 | 现在支持 cwd 选项。 |
v9.4.0 | 现在支持 windowsHide 选项。 |
v8.2.0 | 现在支持 inspectPort 选项。 |
v6.4.0 | 现在支持 stdio 选项。 |
v0.7.1 | 新增于:v0.7.1 |
- <对象>
execArgv
<字符串数组> 传递给 Node.js 可执行文件的字符串参数列表。默认值:process.execArgv
。exec
<字符串> 工作进程文件的路径。默认值:process.argv[1]
。args
<字符串数组> 传递给工作进程的字符串参数。默认值:process.argv.slice(2)
。cwd
<字符串> 工作进程的当前工作目录。默认值:undefined
(继承自父进程)。serialization
<字符串> 指定进程间消息传递使用的序列化类型。可能的值为'json'
和'advanced'
。详情请参阅 用于child_process
的高级序列化。默认值:false
。silent
<布尔值> 是否将输出发送到父进程的标准输入/输出流。默认值:false
。stdio
<数组> 配置派生进程的标准输入/输出流。由于集群模块依赖于 IPC 才能工作,此配置必须包含'ipc'
条目。提供此选项时,它将覆盖silent
。参见child_process.spawn()
的stdio
。uid
<数字> 设置进程的用户标识。(参见setuid(2)
。)gid
<数字> 设置进程的组标识。(参见setgid(2)
。)inspectPort
<数字> | <函数> 设置工作进程的检查器端口。这可以是一个数字,也可以是一个不带参数并返回数字的函数。默认情况下,每个工作进程都会获得它自己的端口,从主进程的process.debugPort
开始递增。windowsHide
<布尔值> 隐藏通常在 Windows 系统上创建的派生进程控制台窗口。默认值:false
。
调用 .setupPrimary()
(或 .fork()
)后,此设置对象将包含设置,包括默认值。
此对象不打算手动更改或设置。
cluster.setupMaster([settings])
[历史]
版本 | 变更 |
---|---|
v16.0.0 | 自 v16.0.0 起已弃用 |
v6.4.0 | 现在支持 stdio 选项。 |
v0.7.1 | 在 v0.7.1 中添加 |
cluster.setupPrimary()
的已弃用别名。
cluster.setupPrimary([settings])
在 v16.0.0 中添加
settings
<对象> 参见cluster.settings
。
setupPrimary
用于更改默认的 'fork' 行为。调用后,设置将出现在 cluster.settings
中。
任何设置更改仅影响对 .fork()
的未来调用,并且对已运行的 worker 没有影响。
worker 的唯一无法通过 .setupPrimary()
设置的属性是传递给 .fork()
的 env
。
上面的默认值仅适用于第一次调用;后续调用的默认值是调用 cluster.setupPrimary()
时的当前值。
import cluster from 'node:cluster'
cluster.setupPrimary({
exec: 'worker.js',
args: ['--use', 'https'],
silent: true,
})
cluster.fork() // https worker
cluster.setupPrimary({
exec: 'worker.js',
args: ['--use', 'http'],
})
cluster.fork() // http worker
const cluster = require('node:cluster')
cluster.setupPrimary({
exec: 'worker.js',
args: ['--use', 'https'],
silent: true,
})
cluster.fork() // https worker
cluster.setupPrimary({
exec: 'worker.js',
args: ['--use', 'http'],
})
cluster.fork() // http worker
这只能从主进程中调用。
cluster.worker
新增于:v0.7.0
当前工作进程对象的引用。主进程中不可用。
import cluster from 'node:cluster'
if (cluster.isPrimary) {
console.log('I am primary')
cluster.fork()
cluster.fork()
} else if (cluster.isWorker) {
console.log(`I am worker #${cluster.worker.id}`)
}
const cluster = require('node:cluster')
if (cluster.isPrimary) {
console.log('I am primary')
cluster.fork()
cluster.fork()
} else if (cluster.isWorker) {
console.log(`I am worker #${cluster.worker.id}`)
}
cluster.workers
新增于:v0.7.0
一个存储活动工作进程对象的哈希表,键为 id
字段。这使得遍历所有工作进程变得很容易。它仅在主进程中可用。
工作进程在断开连接并且退出后,将从 cluster.workers
中移除。这两个事件之间的顺序无法预先确定。但是,保证从 cluster.workers
列表中移除发生在最后一个 'disconnect'
或 'exit'
事件发出之前。
import cluster from 'node:cluster'
for (const worker of Object.values(cluster.workers)) {
worker.send('big announcement to all workers')
}
const cluster = require('node:cluster')
for (const worker of Object.values(cluster.workers)) {
worker.send('big announcement to all workers')
}