日常工作中,對(duì) Node.js 的使用都比較粗淺,趁未羊之際,來(lái)學(xué)點(diǎn)稍微高級(jí)的,那就先從 cluster 開(kāi)始吧。
尼古拉斯張三說(shuō)過(guò),“帶著問(wèn)題去學(xué)習(xí)是一個(gè)比較好的方法”,所以我們也來(lái)試一試。
當(dāng)初使用 cluster 時(shí),一直好奇它是怎么做到多個(gè)子進(jìn)程監(jiān)聽(tīng)同一個(gè)端口而不沖突的,比如下面這段代碼:
const cluster = require('cluster') const net = require('net') const cpus = require('os').cpus() if (cluster.isPrimary) { for (let i = 0; i < cpus.length; i++) { cluster.fork() } } else { net .createServer(function (socket) { socket.on('data', function (data) { socket.write(`Reply from ${process.pid}: ` + data.toString()) }) socket.on('end', function () { console.log('Close') }) socket.write('Hello!n') }) .listen(9999) }
該段代碼通過(guò)父進(jìn)程 fork
出了多個(gè)子進(jìn)程,且這些子進(jìn)程都監(jiān)聽(tīng)了 9999 這個(gè)端口并能正常提供服務(wù),這是如何做到的呢?我們來(lái)研究一下?!鞠嚓P(guān)教程推薦:nodejs視頻教程、編程教學(xué)】
準(zhǔn)備調(diào)試環(huán)境
學(xué)習(xí) Node.js 官方提供庫(kù)最好的方式當(dāng)然是調(diào)試一下,所以,我們先來(lái)準(zhǔn)備一下環(huán)境。注:本文的操作系統(tǒng)為 macOS Big Sur 11.6.6,其他系統(tǒng)請(qǐng)自行準(zhǔn)備相應(yīng)環(huán)境。
編譯 Node.js
-
下載 Node.js 源碼
git clone https://github.com/nodejs/node.git
然后在下面這兩個(gè)地方加入斷點(diǎn),方便后面調(diào)試用:
// lib/internal/cluster/primary.js function queryServer(worker, message) { debugger; // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; ... }
// lib/internal/cluster/child.js send(message, (reply, handle) => { debugger if (typeof obj._setServerData === 'function') obj._setServerData(reply.data) if (handle) { // Shared listen socket shared(reply, {handle, indexesKey, index}, cb) } else { // Round-robin. rr(reply, {indexesKey, index}, cb) } })
-
進(jìn)入目錄,執(zhí)行
./configure --debug make -j4
之后會(huì)生成 out/Debug/node
準(zhǔn)備 IDE 環(huán)境
使用 vscode 調(diào)試,配置好 launch.json
就可以了(其他 IDE 類(lèi)似,請(qǐng)自行解決):
{ "version": "0.2.0", "configurations": [ { "name": "Debug C++", "type": "cppdbg", "program": "/Users/youxingzhi/ayou/node/out/Debug/node", "request": "launch", "args": ["/Users/youxingzhi/ayou/node/index.js"], "stopAtEntry": false, "cwd": "${workspaceFolder}", "environment": [], "externalConsole": false, "MIMode": "lldb" }, { "name": "Debug Node", "type": "node", "runtimeExecutable": "/Users/youxingzhi/ayou/node/out/Debug/node", "request": "launch", "args": ["--expose-internals", "--nolazy"], "skipFiles": [], "program": "${workspaceFolder}/index.js" } ] }
其中第一個(gè)是用于調(diào)式 C++ 代碼(需要安裝 C/C++ 插件),第二個(gè)用于調(diào)式 JS 代碼。接下來(lái)就可以開(kāi)始調(diào)試了,我們暫時(shí)用調(diào)式 JS 代碼的那個(gè)配置就好了。
Cluster 源碼調(diào)試
準(zhǔn)備好調(diào)試代碼(為了調(diào)試而已,這里啟動(dòng)一個(gè)子進(jìn)程就夠了):
debugger const cluster = require('cluster') const net = require('net') if (cluster.isPrimary) { debugger cluster.fork() } else { const server = net.createServer(function (socket) { socket.on('data', function (data) { socket.write(`Reply from ${process.pid}: ` + data.toString()) }) socket.on('end', function () { console.log('Close') }) socket.write('Hello!n') }) debugger server.listen(9999) }
很明顯,我們的程序可以分父進(jìn)程和子進(jìn)程這兩部分來(lái)進(jìn)行分析。
首先進(jìn)入的是父進(jìn)程:
執(zhí)行 require('cluster')
時(shí),會(huì)進(jìn)入 lib/cluster.js
這個(gè)文件:
const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'primary' module.exports = require(`internal/cluster/${childOrPrimary}`)
會(huì)根據(jù)當(dāng)前 process.env
上是否有 NODE_UNIQUE_ID
來(lái)引入不同的模塊,此時(shí)是沒(méi)有的,所以會(huì)引入 internal/cluster/primary.js
這個(gè)模塊:
... const cluster = new EventEmitter(); ... module.exports = cluster const handles = new SafeMap() cluster.isWorker = false cluster.isMaster = true // Deprecated alias. Must be same as isPrimary. cluster.isPrimary = true cluster.Worker = Worker cluster.workers = {} cluster.settings = {} cluster.SCHED_NONE = SCHED_NONE // Leave it to the operating system. cluster.SCHED_RR = SCHED_RR // Primary distributes connections. ... cluster.schedulingPolicy = schedulingPolicy cluster.setupPrimary = function (options) { ... } // Deprecated alias must be same as setupPrimary cluster.setupMaster = cluster.setupPrimary function setupSettingsNT(settings) { ... } function createWorkerProcess(id, env) { ... } function removeWorker(worker) { ... } function removeHandlesForWorker(worker) { ... } cluster.fork = function (env) { ... }
該模塊主要是在 cluster
對(duì)象上掛載了一些屬性和方法,并導(dǎo)出,這些后面回過(guò)頭再看,我們繼續(xù)往下調(diào)試。往下調(diào)試會(huì)進(jìn)入 if (cluster.isPrimary)
分支,代碼很簡(jiǎn)單,僅僅是 fork
出了一個(gè)新的子進(jìn)程而已:
// lib/internal/cluster/primary.js cluster.fork = function (env) { cluster.setupPrimary() const id = ++ids const workerProcess = createWorkerProcess(id, env) const worker = new Worker({ id: id, process: workerProcess, }) ... worker.process.on('internalMessage', internal(worker, onmessage)) process.nextTick(emitForkNT, worker) cluster.workers[worker.id] = worker return worker }
cluster.setupPrimary()
:比較簡(jiǎn)單,初始化一些參數(shù)啥的。
createWorkerProcess(id, env)
:
// lib/internal/cluster/primary.js function createWorkerProcess(id, env) { const workerEnv = {...process.env, ...env, NODE_UNIQUE_ID: `${id}`} const execArgv = [...cluster.settings.execArgv] ... return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid, }) }
可以看到,該方法主要是通過(guò) fork
啟動(dòng)了一個(gè)子進(jìn)程來(lái)執(zhí)行我們的 index.js
,且啟動(dòng)子進(jìn)程的時(shí)候設(shè)置了環(huán)境變量 NODE_UNIQUE_ID
,這樣 index.js
中 require('cluster')
的時(shí)候,引入的就是 internal/cluster/child.js
模塊了。
worker.process.on('internalMessage', internal(worker, onmessage))
:監(jiān)聽(tīng)子進(jìn)程傳遞過(guò)來(lái)的消息并處理。
接下來(lái)就進(jìn)入了子進(jìn)程的邏輯:
前面說(shuō)了,此時(shí)引入的是 internal/cluster/child.js
模塊,我們先跳過(guò),繼續(xù)往下,執(zhí)行 server.listen(9999)
時(shí)實(shí)際上是調(diào)用了 Server
上的方法:
// lib/net.js Server.prototype.listen = function (...args) { ... listenInCluster( this, null, options.port | 0, 4, backlog, undefined, options.exclusive ); }
可以看到,最終是調(diào)用了 listenInCluster
:
// lib/net.js function listenInCluster( server, address, port, addressType, backlog, fd, exclusive, flags, options ) { exclusive = !!exclusive if (cluster === undefined) cluster = require('cluster') if (cluster.isPrimary || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags) return } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, backlog, ...options, } // Get the primary's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnPrimaryHandle) function listenOnPrimaryHandle(err, handle) { err = checkBindError(err, port, handle) if (err) { const ex = exceptionWithHostPort(err, 'bind', address, port) return server.emit('error', ex) } // Reuse primary's server handle server._handle = handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags) } }
由于是在子進(jìn)程中執(zhí)行,所以最后會(huì)調(diào)用 cluster._getServer(server, serverQuery, listenOnPrimaryHandle)
:
// lib/internal/cluster/child.js // 這里的 cb 就是上面的 listenOnPrimaryHandle cluster._getServer = function (obj, options, cb) { ... send(message, (reply, handle) => { debugger if (typeof obj._setServerData === 'function') obj._setServerData(reply.data) if (handle) { // Shared listen socket shared(reply, {handle, indexesKey, index}, cb) } else { // Round-robin. rr(reply, {indexesKey, index}, cb) } }) ... }
該函數(shù)最終會(huì)向父進(jìn)程發(fā)送 queryServer
的消息,父進(jìn)程處理完后會(huì)調(diào)用回調(diào)函數(shù),回調(diào)函數(shù)中會(huì)調(diào)用 cb
即 listenOnPrimaryHandle
??磥?lái),listen
的邏輯是在父進(jìn)程中進(jìn)行的了。
接下來(lái)進(jìn)入父進(jìn)程:
父進(jìn)程收到 queryServer
的消息后,最終會(huì)調(diào)用 queryServer
這個(gè)方法:
// lib/internal/cluster/primary.js function queryServer(worker, message) { // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}` let handle = handles.get(key) if (handle === undefined) { let address = message.address // Find shortest path for unix sockets because of the ~100 byte limit if ( message.port < 0 && typeof address === 'string' && process.platform !== 'win32' ) { address = path.relative(process.cwd(), address) if (message.address.length < address.length) address = message.address } // UDP is exempt from round-robin connection balancing for what should // be obvious reasons: it's connectionless. There is nothing to send to // the workers except raw datagrams and that's pointless. if ( schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6' ) { handle = new SharedHandle(key, address, message) } else { handle = new RoundRobinHandle(key, address, message) } handles.set(key, handle) } ... }
可以看到,這里主要是對(duì) handle
的處理,這里的 handle
指的是調(diào)度策略,分為 SharedHandle
和 RoundRobinHandle
,分別對(duì)應(yīng)搶占式和輪詢(xún)兩種策略(文章最后補(bǔ)充部分有關(guān)于兩者對(duì)比的例子)。
Node.js 中默認(rèn)是 RoundRobinHandle
策略,可通過(guò)環(huán)境變量 NODE_CLUSTER_SCHED_POLICY
來(lái)修改,取值可以為 none
(SharedHandle
) 或 rr
(RoundRobinHandle
)。
SharedHandle
首先,我們來(lái)看一下 SharedHandle
,由于我們這里是 TCP
協(xié)議,所以最后會(huì)通過(guò) net._createServerHandle
創(chuàng)建一個(gè) TCP
對(duì)象掛載在 handle
屬性上(注意這里又有一個(gè) handle
,別搞混了):
// lib/internal/cluster/shared_handle.js function SharedHandle(key, address, {port, addressType, fd, flags}) { this.key = key this.workers = new SafeMap() this.handle = null this.errno = 0 let rval if (addressType === 'udp4' || addressType === 'udp6') rval = dgram._createSocketHandle(address, port, addressType, fd, flags) else rval = net._createServerHandle(address, port, addressType, fd, flags) if (typeof rval === 'number') this.errno = rval else this.handle = rval }
在 createServerHandle
中除了創(chuàng)建 TCP
對(duì)象外,還綁定了端口和地址:
// lib/net.js function createServerHandle(address, port, addressType, fd, flags) { ... } else { handle = new TCP(TCPConstants.SERVER); isTCP = true; } if (address || port || isTCP) { ... err = handle.bind6(address, port, flags); } else { err = handle.bind(address, port); } } ... return handle; }
然后,queryServer
中繼續(xù)執(zhí)行,會(huì)調(diào)用 add
方法,最終會(huì)將 handle
也就是 TCP
對(duì)象傳遞給子進(jìn)程:
// lib/internal/cluster/primary.js function queryServer(worker, message) { ... if (!handle.data) handle.data = message.data // Set custom server data handle.add(worker, (errno, reply, handle) => { const {data} = handles.get(key) if (errno) handles.delete(key) // Gives other workers a chance to retry. send( worker, { errno, key, ack: message.seq, data, ...reply, }, handle // TCP 對(duì)象 ) }) ... }
之后進(jìn)入子進(jìn)程:
子進(jìn)程收到父進(jìn)程對(duì)于 queryServer
的回復(fù)后,會(huì)調(diào)用 shared
:
// lib/internal/cluster/child.js // `obj` is a net#Server or a dgram#Socket object. cluster._getServer = function (obj, options, cb) { ... send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data) if (handle) { // Shared listen socket shared(reply, {handle, indexesKey, index}, cb) } else { // Round-robin. rr(reply, {indexesKey, index}, cb) // cb 是 listenOnPrimaryHandle } }) ... }
shared
中最后會(huì)調(diào)用 cb
也就是 listenOnPrimaryHandle
:
// lib/net.js function listenOnPrimaryHandle(err, handle) { err = checkBindError(err, port, handle) if (err) { const ex = exceptionWithHostPort(err, 'bind', address, port) return server.emit('error', ex) } // Reuse primary's server handle 這里的 server 是 index.js 中 net.createServer 返回的那個(gè)對(duì)象 server._handle = handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags) }
這里會(huì)把 handle
賦值給 server._handle
,這里的 server
是 index.js
中 net.createServer
返回的那個(gè)對(duì)象,并調(diào)用 server._listen2
,也就是 setupListenHandle
:
// lib/net.js function setupListenHandle(address, port, addressType, backlog, fd, flags) { debug('setupListenHandle', address, port, addressType, backlog, fd) // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already') } else { ... } this[async_id_symbol] = getNewAsyncId(this._handle) this._handle.onconnection = onconnection this._handle[owner_symbol] = this // Use a backlog of 512 entries. We pass 511 to the listen() call because // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1); // which will thus give us a backlog of 512 entries. const err = this._handle.listen(backlog || 511) if (err) { const ex = uvExceptionWithHostPort(err, 'listen', address, port) this._handle.close() this._handle = null defaultTriggerAsyncIdScope( this[async_id_symbol], process.nextTick, emitErrorNT, this, ex ) return } }
首先會(huì)執(zhí)行 this._handle.onconnection = onconnection
,由于客戶(hù)端請(qǐng)求過(guò)來(lái)時(shí)會(huì)調(diào)用 this._handle
(也就是 TCP
對(duì)象)上的 onconnection
方法,也就是會(huì)執(zhí)行lib/net.js
中的 onconnection
方法建立連接,之后就可以通信了。為了控制篇幅,該方法就不繼續(xù)往下了。
然后調(diào)用 listen
監(jiān)聽(tīng),注意這里參數(shù) backlog
跟之前不同,不是表示端口,而是表示在拒絕連接之前,操作系統(tǒng)可以掛起的最大連接數(shù)量,也就是連接請(qǐng)求的排隊(duì)數(shù)量。我們平時(shí)遇到的 listen EADDRINUSE: address already in use
錯(cuò)誤就是因?yàn)檫@行代碼返回了非 0 的錯(cuò)誤。
如果還有其他子進(jìn)程,也會(huì)同樣走一遍上述的步驟,不同之處是在主進(jìn)程中 queryServer
時(shí),由于已經(jīng)有 handle
了,不需要再重新創(chuàng)建了:
function queryServer(worker, message) { debugger; // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; let handle = handles.get(key); ... }
以上內(nèi)容整理成流程圖如下:
所謂的 SharedHandle
,其實(shí)是在多個(gè)子進(jìn)程中共享 TCP
對(duì)象的句柄,當(dāng)客戶(hù)端請(qǐng)求過(guò)來(lái)時(shí),多個(gè)進(jìn)程會(huì)去競(jìng)爭(zhēng)該請(qǐng)求的處理權(quán),會(huì)導(dǎo)致任務(wù)分配不均的問(wèn)題,這也是為什么需要 RoundRobinHandle
的原因。接下來(lái)繼續(xù)看看這種調(diào)度方式。
RoundRobinHandle
// lib/internal/cluster/round_robin_handle.js function RoundRobinHandle( key, address, {port, fd, flags, backlog, readableAll, writableAll} ) { ... this.server = net.createServer(assert.fail) ... else if (port >= 0) { this.server.listen({ port, host: address, // Currently, net module only supports `ipv6Only` option in `flags`. ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), backlog, }) } ... this.server.once('listening', () => { this.handle = this.server._handle this.handle.onconnection = (err, handle) => { this.distribute(err, handle) } this.server._handle = null this.server = null }) }
如上所示,RoundRobinHandle
會(huì)調(diào)用 net.createServer()
創(chuàng)建一個(gè) server
,然后調(diào)用 listen
方法,最終會(huì)來(lái)到 setupListenHandle
:
// lib/net.js function setupListenHandle(address, port, addressType, backlog, fd, flags) { debug('setupListenHandle', address, port, addressType, backlog, fd) // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already') } else { debug('setupListenHandle: create a handle') let rval = null // Try to bind to the unspecified IPv6 address, see if IPv6 is available if (!address && typeof fd !== 'number') { rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags) if (typeof rval === 'number') { rval = null address = DEFAULT_IPV4_ADDR addressType = 4 } else { address = DEFAULT_IPV6_ADDR addressType = 6 } } if (rval === null) rval = createServerHandle(address, port, addressType, fd, flags) if (typeof rval === 'number') { const error = uvExceptionWithHostPort(rval, 'listen', address, port) process.nextTick(emitErrorNT, this, error) return } this._handle = rval } this[async_id_symbol] = getNewAsyncId(this._handle) this._handle.onconnection = onconnection this._handle[owner_symbol] = this ... }
且由于此時(shí) this._handle
為空,會(huì)調(diào)用 createServerHandle()
生成一個(gè) TCP
對(duì)象作為 _handle
。之后就跟 SharedHandle
一樣了,最后也會(huì)回到子進(jìn)程:
// lib/internal/cluster/child.js // `obj` is a net#Server or a dgram#Socket object. cluster._getServer = function (obj, options, cb) { ... send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data) if (handle) { // Shared listen socket shared(reply, {handle, indexesKey, index}, cb) } else { // Round-robin. rr(reply, {indexesKey, index}, cb) // cb 是 listenOnPrimaryHandle } }) ... }
不過(guò)由于 RoundRobinHandle
不會(huì)傳遞 handle
給子進(jìn)程,所以此時(shí)會(huì)執(zhí)行 rr
:
function rr(message, {indexesKey, index}, cb) { ... // Faux handle. Mimics a TCPWrap with just enough fidelity to get away // with it. Fools net.Server into thinking that it's backed by a real // handle. Use a noop function for ref() and unref() because the control // channel is going to keep the worker alive anyway. const handle = {close, listen, ref: noop, unref: noop} if (message.sockname) { handle.getsockname = getsockname // TCP handles only. } assert(handles.has(key) === false) handles.set(key, handle) debugger cb(0, handle) }
可以看到,這里構(gòu)造了一個(gè)假的 handle
,然后執(zhí)行 cb
也就是 listenOnPrimaryHandle
。最終跟 SharedHandle
一樣會(huì)調(diào)用 setupListenHandle
執(zhí)行 this._handle.onconnection = onconnection
。
RoundRobinHandle
邏輯到此就結(jié)束了,好像缺了點(diǎn)什么的樣子?;仡櫹?,我們給每個(gè)子進(jìn)程中的 server
上都掛載了一個(gè)假的 handle
,但它跟綁定了端口的 TCP
對(duì)象沒(méi)有任何關(guān)系,如果客戶(hù)端請(qǐng)求過(guò)來(lái)了,是不會(huì)執(zhí)行它上面的 onconnection
方法的。之所以要這樣寫(xiě),估計(jì)是為了保持跟之前 SharedHandle
代碼邏輯的統(tǒng)一。
此時(shí),我們需要回到 RoundRobinHandle
,有這樣一段代碼:
// lib/internal/cluster/round_robin_handle.js this.server.once('listening', () => { this.handle = this.server._handle this.handle.onconnection = (err, handle) => { this.distribute(err, handle) } this.server._handle = null this.server = null })
在 listen
執(zhí)行完后,會(huì)觸發(fā) listening
事件的回調(diào),這里重寫(xiě)了 handle
上面的 onconnection
。
所以,當(dāng)客戶(hù)端請(qǐng)求過(guò)來(lái)時(shí),會(huì)調(diào)用 distribute
在多個(gè)子進(jìn)程中輪詢(xún)分發(fā),這里又有一個(gè) handle
,這里的 handle
姑且理解為 clientHandle
,即客戶(hù)端連接的 handle
,別搞混了??傊詈髸?huì)將這個(gè) clientHandle
發(fā)送給子進(jìn)程:
// lib/internal/cluster/round_robin_handle.js RoundRobinHandle.prototype.handoff = function (worker) { ... const message = { act: 'newconn', key: this.key }; // 這里的 handle 是 clientHandle sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // Worker is shutting down. Send to another. this.handoff(worker); }); };
而子進(jìn)程在 require('cluster')
時(shí),已經(jīng)監(jiān)聽(tīng)了該事件:
// lib/internal/cluster/child.js process.on('internalMessage', internal(worker, onmessage)) send({act: 'online'}) function onmessage(message, handle) { if (message.act === 'newconn') onconnection(message, handle) else if (message.act === 'disconnect') ReflectApply(_disconnect, worker, [true]) }
最終也同樣會(huì)走到 net.js
中的 function onconnection(err, clientHandle)
方法。這個(gè)方法第二個(gè)參數(shù)名就叫 clientHandle
,這也是為什么前面的 handle
我想叫這個(gè)名字的原因。
還是用圖來(lái)總結(jié)下:
跟 SharedHandle
不同的是,該調(diào)度策略中 onconnection
最開(kāi)始是在主進(jìn)程中觸發(fā)的,然后通過(guò)輪詢(xún)算法挑選一個(gè)子進(jìn)程,將 clientHandle
傳遞給它。
為什么端口不沖突
cluster 模塊的調(diào)試就到此告一段落了,接下來(lái)我們來(lái)回答一下一開(kāi)始的問(wèn)題,為什么多個(gè)進(jìn)程監(jiān)聽(tīng)同一個(gè)端口沒(méi)有報(bào)錯(cuò)?
網(wǎng)上有些文章說(shuō)是因?yàn)樵O(shè)置了 SO_REUSEADDR
,但其實(shí)跟這個(gè)沒(méi)關(guān)系。通過(guò)上面的分析知道,不管什么調(diào)度策略,最終都只會(huì)在主進(jìn)程中對(duì) TCP
對(duì)象 bind
一次。
我們可以修改一下源代碼來(lái)測(cè)試一下:
// deps/uv/src/unix/tcp.c 下面的 SO_REUSEADDR 改成 SO_DEBUG if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
編譯后執(zhí)行發(fā)現(xiàn),我們?nèi)匀豢梢哉J褂?cluster 模塊。
那這個(gè) SO_REUSEADDR
到底影響的是啥呢?我們繼續(xù)來(lái)研究一下。
SO_REUSEADDR
首先,我們我們知道,下面的代碼是會(huì)報(bào)錯(cuò)的:
const net = require('net') const server1 = net.createServer() const server2 = net.createServer() server1.listen(9999) server2.listen(9999)
但是,如果我稍微修改一下,就不會(huì)報(bào)錯(cuò)了:
const net = require('net') const server1 = net.createServer() const server2 = net.createServer() server1.listen(9999, '127.0.0.1') server2.listen(9999, '10.53.48.67')
原因在于 listen
時(shí),如果不指定 address
,則相當(dāng)于綁定了所有地址,當(dāng)兩個(gè) server 都這樣做時(shí),請(qǐng)求到來(lái)就不知道要給誰(shuí)處理了。
我們可以類(lèi)比成找對(duì)象,port
是對(duì)外貌的要求,address
是對(duì)城市的要求?,F(xiàn)在甲乙都想要一個(gè) port
是 1米7以上
不限城市的對(duì)象,那如果有一個(gè) 1米7以上
來(lái)自 深圳
的對(duì)象,就不知道介紹給誰(shuí)了。而如果兩者都指定了城市就好辦多了。
那如果一個(gè)指定了 address
,一個(gè)沒(méi)有呢?就像下面這樣:
const net = require('net') const server1 = net.createServer() const server2 = net.createServer() server1.listen(9999, '127.0.0.1') server2.listen(9999)
結(jié)果是:設(shè)置了 SO_REUSEADDR
可以正常運(yùn)行,而修改成 SO_DEBUG
的會(huì)報(bào)錯(cuò)。
還是上面的例子,甲對(duì)城市沒(méi)有限制,乙需要是來(lái)自 深圳
的,那當(dāng)一個(gè)對(duì)象來(lái)自 深圳
,我們可以選擇優(yōu)先介紹給乙,非 深圳
的就選擇介紹給甲,這個(gè)就是 SO_REUSEADDR
的作用。
補(bǔ)充
SharedHandle
和 RoundRobinHandle
兩種模式的對(duì)比
先準(zhǔn)備下測(cè)試代碼:
// cluster.js const cluster = require('cluster') const net = require('net') if (cluster.isMaster) { for (let i = 0; i < 4; i++) { cluster.fork() } } else { const server = net.createServer() server.on('connection', (socket) => { console.log(`PID: ${process.pid}!`) }) server.listen(9997) }
// client.js const net = require('net') for (let i = 0; i < 20; i++) { net.connect({port: 9997}) }
RoundRobin先執(zhí)行 node cluster.js
,然后執(zhí)行 node client.js
,會(huì)看到如下輸出,可以看到?jīng)]有任何一個(gè)進(jìn)程的 PID 是緊挨著的。至于為什么沒(méi)有一直按照一樣的順序,后面再研究一下。
PID: 42904! PID: 42906! PID: 42905! PID: 42904! PID: 42907! PID: 42905! PID: 42906! PID: 42907! PID: 42904! PID: 42905! PID: 42906! PID: 42907! PID: 42904! PID: 42905! PID: 42906! PID: 42907! PID: 42904! PID: 42905! PID: 42906! PID: 42904!
Shared
先執(zhí)行 NODE_CLUSTER_SCHED_POLICY=none node cluster.js
,則 Node.js 會(huì)使用 SharedHandle
,然后執(zhí)行 node client.js
,會(huì)看到如下輸出,可以看到同一個(gè) PID 連續(xù)輸出了多次,所以這種策略會(huì)導(dǎo)致進(jìn)程任務(wù)分配不均的現(xiàn)象。就像公司里有些人忙到 996,有些人天天摸魚(yú),這顯然不是老板愿意看到的現(xiàn)象,所以不推薦使用。
PID: 42561! PID: 42562! PID: 42561! PID: 42562! PID: 42564! PID: 42561! PID: 42562! PID: 42563! PID: 42561! PID: 42562! PID: 42563! PID: 42564! PID: 42564! PID: 42564! PID: 42564! PID: 42564! PID: 42563! PID: 42563! PID: 42564! PID: 42563!