進(jìn)程間怎么進(jìn)行通信?下面本篇文章給大家介紹一下Node進(jìn)程間通信的原理,希望對(duì)大家有所幫助!
前端(vue)入門到精通課程:進(jìn)入學(xué)習(xí)
前置知識(shí)
文件描述符
在 Linux 系統(tǒng)中,一切都看成文件,當(dāng)進(jìn)程打開現(xiàn)有文件時(shí),會(huì)返回一個(gè)文件描述符。 文件描述符是操作系統(tǒng)為了管理已經(jīng)被進(jìn)程打開的文件所創(chuàng)建的索引,用來指向被打開的文件。 當(dāng)我們的進(jìn)程啟動(dòng)之后,操作系統(tǒng)會(huì)給每一個(gè)進(jìn)程分配一個(gè) PCB 控制塊,PCB 中會(huì)有一個(gè)文件描述符表,存放當(dāng)前進(jìn)程所有的文件描述符,即當(dāng)前進(jìn)程打開的所有文件。
? 進(jìn)程中的文件描述符是如何和系統(tǒng)文件對(duì)應(yīng)起來的? 在內(nèi)核中,系統(tǒng)會(huì)維護(hù)另外兩種表
- 打開文件表(Open file table)
- i-node 表(i-node table)
文件描述符就是數(shù)組的下標(biāo),從0開始往上遞增,0/1/2 默認(rèn)是我們的輸入/輸出/錯(cuò)誤流的文件描述符 在 PCB 中維護(hù)的文件描述表中,可以根據(jù)文件描述符找到對(duì)應(yīng)了文件指針,找到對(duì)應(yīng)的打開文件表 打開文件表中維護(hù)了:文件偏移量(讀寫文件的時(shí)候會(huì)更新);對(duì)于文件的狀態(tài)標(biāo)識(shí);指向 i-node 表的指針 想要真正的操作文件,還得靠 i-node 表,能夠獲取到真實(shí)文件的相關(guān)信息
他們之間的關(guān)系
圖解
- 在進(jìn)程 A 中,文件描述符1/20均指向了同一打開文件表項(xiàng)23,這可能是對(duì)同一文件多次調(diào)用了 open 函數(shù)形成的
- 進(jìn)程 A/B 的文件描述符2都指向同一文件,這可能是調(diào)用了 fork 創(chuàng)建子進(jìn)程,A/B 是父子關(guān)系進(jìn)程
- 進(jìn)程 A 的文件描述符0和進(jìn)程 B 的文件描述符指向了不同的打開文件表項(xiàng),但這些表項(xiàng)指向了同一個(gè)文件,這可能是 A/B 進(jìn)程分別對(duì)同一文件發(fā)起了 open 調(diào)用
總結(jié)
- 同一進(jìn)程的不同文件描述符可以指向同一個(gè)文件
- 不同進(jìn)程可以擁有相同的文件描述符
- 不同進(jìn)程的同一文件描述符可以指向不同的文件
- 不同進(jìn)程的不同文件描述符可以指向同一個(gè)文件
文件描述符的重定向
每次讀寫進(jìn)程的時(shí)候,都是從文件描述符下手,找到對(duì)應(yīng)的打開文件表項(xiàng),再找到對(duì)應(yīng)的 i-node 表
?如何實(shí)現(xiàn)文件描述符重定向? 因?yàn)樵谖募枋龇碇?,能夠找到?duì)應(yīng)的文件指針,如果我們改變了文件指針,是不是后續(xù)的兩個(gè)表內(nèi)容就發(fā)生了改變 例如:文件描述符1指向的顯示器,那么將文件描述符1指向 log.txt 文件,那么文件描述符 1 也就和 log.txt 對(duì)應(yīng)起來了
shell 對(duì)文件描述符的重定向
> 是輸出重定向符號(hào),< 是輸入重定向符號(hào),它們是文件描述符操作符 > 和 < 通過修改文件描述符改變了文件指針的指向,來能夠?qū)崿F(xiàn)重定向的功能
我們使用cat hello.txt
時(shí),默認(rèn)會(huì)將結(jié)果輸出到顯示器上,使用 > 來重定向。cat hello.txt 1 > log.txt
以輸出的方式打開文件 log.txt,并綁定到文件描述符1上
c函數(shù)對(duì)文件描述符的重定向
dup
dup 函數(shù)是用來打開一個(gè)新的文件描述符,指向和 oldfd 同一個(gè)文件,共享文件偏移量和文件狀態(tài)
int main(int argc, char const *argv[]) { int fd = open("log.txt"); int copyFd = dup(fd); //將fd閱讀文件置于文件末尾,計(jì)算偏移量。 cout << "fd = " << fd << " 偏移量: " << lseek(fd, 0, SEEK_END) << endl; //現(xiàn)在我們計(jì)算copyFd的偏移量 cout << "copyFd = " << copyFd << "偏移量:" << lseek(copyFd, 0, SEEK_CUR) << endl; return 0; }
調(diào)用 dup(3) 的時(shí)候,會(huì)打開新的最小描述符,也就是4,這個(gè)4指向了3所指向的文件,操作任意一個(gè) fd 都是修改的一個(gè)文件
dup2
dup2 函數(shù),把指定的 newfd 也指向 oldfd 指向的文件。執(zhí)行完dup2之后,newfd 和 oldfd 同時(shí)指向同一個(gè)文件,共享文件偏移量和文件狀態(tài)
int main(int argc, char const *argv[]) { int fd = open("log.txt"); int copyFd = dup(fd); //將fd閱讀文件置于文件末尾,計(jì)算偏移量。 cout << "fd = " << fd << " 偏移量: " << lseek(fd, 0, SEEK_END) << endl; //現(xiàn)在我們計(jì)算copyFd的偏移量 cout << "copyFd = " << copyFd << "偏移量:" << lseek(copyFd, 0, SEEK_CUR) << endl; return 0; }
Node中通信原理
Node 中的 IPC 通道具體實(shí)現(xiàn)是由 libuv 提供的。根據(jù)系統(tǒng)的不同實(shí)現(xiàn)方式不同,window 下采用命名管道實(shí)現(xiàn),*nix 下采用 Domain Socket 實(shí)現(xiàn)。在應(yīng)用層只體現(xiàn)為 message 事件和 send 方法?!鞠嚓P(guān)教程推薦:nodejs視頻教程】
父進(jìn)程在實(shí)際創(chuàng)建子進(jìn)程之前,會(huì)創(chuàng)建 IPC 通道并監(jiān)聽它,等到創(chuàng)建出真實(shí)的子進(jìn)程后,通過環(huán)境變量(NODE_CHANNEL_FD)告訴子進(jìn)程該 IPC 通道的文件描述符。
子進(jìn)程在啟動(dòng)的過程中,會(huì)根據(jù)該文件描述符去連接 IPC 通道,從而完成父子進(jìn)程的連接。
建立連接之后可以自由的通信了,IPC 通道是使用命名管道或者 Domain Socket 創(chuàng)建的,屬于雙向通信。并且它是在系統(tǒng)內(nèi)核中完成的進(jìn)程通信
⚠️ 只有在啟動(dòng)的子進(jìn)程是 Node 進(jìn)程時(shí),子進(jìn)程才會(huì)根據(jù)環(huán)境變量去連接對(duì)應(yīng)的 IPC 通道,對(duì)于其他類型的子進(jìn)程則無法實(shí)現(xiàn)進(jìn)程間通信,除非其他進(jìn)程也按著該約定去連接這個(gè) IPC 通道。
unix domain socket
是什么
我們知道經(jīng)典的通信方式是有 Socket,我們平時(shí)熟知的 Socket 是基于網(wǎng)絡(luò)協(xié)議的,用于兩個(gè)不同主機(jī)上的兩個(gè)進(jìn)程通信,通信需要指定 IP/Host 等。 但如果我們同一臺(tái)主機(jī)上的兩個(gè)進(jìn)程想要通信,如果使用 Socket 需要指定 IP/Host,經(jīng)過網(wǎng)絡(luò)協(xié)議等,會(huì)顯得過于繁瑣。所以 Unix Domain Socket 誕生了。
UDS 的優(yōu)勢(shì):
- 綁定 socket 文件而不是綁定 IP/Host;不需要經(jīng)過網(wǎng)絡(luò)協(xié)議,而是數(shù)據(jù)的拷貝
- 也支持 SOCK_STREAM(流套接字)和 SOCK_DGRAM(數(shù)據(jù)包套接字),但由于是在本機(jī)通過內(nèi)核通信,不會(huì)丟包也不會(huì)出現(xiàn)發(fā)送包的次序和接收包的次序不一致的問題
如何實(shí)現(xiàn)
流程圖
Server 端
int main(int argc, char *argv[]) { int server_fd ,ret, client_fd; struct sockaddr_un serv, client; socklen_t len = sizeof(client); char buf[1024] = {0}; int recvlen; // 創(chuàng)建 socket server_fd = socket(AF_LOCAL, SOCK_STREAM, 0); // 初始化 server 信息 serv.sun_family = AF_LOCAL; strcpy(serv.sun_path, "server.sock"); // 綁定 ret = bind(server_fd, (struct sockaddr *)&serv, sizeof(serv)); //設(shè)置監(jiān)聽,設(shè)置能夠同時(shí)和服務(wù)端連接的客戶端數(shù)量 ret = listen(server_fd, 36); //等待客戶端連接 client_fd = accept(server_fd, (struct sockaddr *)&client, &len); printf("=====client bind file:%sn", client.sun_path); while (1) { recvlen = recv(client_fd, buf, sizeof(buf), 0); if (recvlen == -1) { perror("recv error"); return -1; } else if (recvlen == 0) { printf("client disconnet...n"); close(client_fd); break; } else { printf("recv buf %sn", buf); send(client_fd, buf, recvlen, 0); } } close(client_fd); close(server_fd); return 0; }
Client 端
int main(int argc, char *argv[]) { int client_fd ,ret; struct sockaddr_un serv, client; socklen_t len = sizeof(client); char buf[1024] = {0}; int recvlen; //創(chuàng)建socket client_fd = socket(AF_LOCAL, SOCK_STREAM, 0); //給客戶端綁定一個(gè)套接字文件 client.sun_family = AF_LOCAL; strcpy(client.sun_path, "client.sock"); ret = bind(client_fd, (struct sockaddr *)&client, sizeof(client)); //初始化server信息 serv.sun_family = AF_LOCAL; strcpy(serv.sun_path, "server.sock"); //連接 connect(client_fd, (struct sockaddr *)&serv, sizeof(serv)); while (1) { fgets(buf, sizeof(buf), stdin); send(client_fd, buf, strlen(buf)+1, 0); recv(client_fd, buf, sizeof(buf), 0); printf("recv buf %sn", buf); } close(client_fd); return 0; }
命名管道(Named Pipe)
是什么
命名管道是可以在同一臺(tái)計(jì)算機(jī)的不同進(jìn)程之間,或者跨越一個(gè)網(wǎng)絡(luò)的不同計(jì)算機(jī)的不同進(jìn)程之間的可靠的單向或者雙向的數(shù)據(jù)通信。 創(chuàng)建命名管道的進(jìn)程被稱為管道服務(wù)端(Pipe Server),連接到這個(gè)管道的進(jìn)程稱為管道客戶端(Pipe Client)。
命名管道的命名規(guī)范:serverpipe[path]name
- 其中 server 指定一個(gè)服務(wù)器的名字,本機(jī)適用 . 表示,192.10.10.1 表示網(wǎng)絡(luò)上的服務(wù)器
- pipe 是一個(gè)不可變化的字串,用于指定該文件屬于 NPFS(Named Pipe File System)
- [path]name 是唯一命名管道名稱的標(biāo)識(shí)
怎么實(shí)現(xiàn)
流程圖
Pipe Server
void ServerTest() { HANDLE serverNamePipe; char pipeName[MAX_PATH] = {0}; char szReadBuf[MAX_BUFFER] = {0}; char szWriteBuf[MAX_BUFFER] = {0}; DWORD dwNumRead = 0; DWORD dwNumWrite = 0; strcpy(pipeName, "\\.\pipe\shuangxuPipeTest"); // 創(chuàng)建管道實(shí)例 serverNamePipe = CreateNamedPipeA(pipeName, PIPE_ACCESS_DUPLEX|FILE_FLAG_WRITE_THROUGH, PIPE_TYPE_BYTE|PIPE_READMODE_BYTE|PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, 0, 0, 0, NULL); WriteLog("創(chuàng)建管道成功..."); // 等待客戶端連接 BOOL bRt= ConnectNamedPipe(serverNamePipe, NULL ); WriteLog( "收到客戶端的連接成功..."); // 接收數(shù)據(jù) memset( szReadBuf, 0, MAX_BUFFER ); bRt = ReadFile(serverNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL ); // 業(yè)務(wù)邏輯處理 (只為測(cè)試用返回原來的數(shù)據(jù)) WriteLog( "收到客戶數(shù)據(jù):[%s]", szReadBuf); // 發(fā)送數(shù)據(jù) if( !WriteFile(serverNamePipe, szWriteBuf, dwNumRead, &dwNumWrite, NULL ) ) { WriteLog("向客戶寫入數(shù)據(jù)失敗:[%#x]", GetLastError()); return ; } WriteLog("寫入數(shù)據(jù)成功..."); }
Pipe Client
void ClientTest() { char pipeName[MAX_PATH] = {0}; HANDLE clientNamePipe; DWORD dwRet; char szReadBuf[MAX_BUFFER] = {0}; char szWriteBuf[MAX_BUFFER] = {0}; DWORD dwNumRead = 0; DWORD dwNumWrite = 0; strcpy(pipeName, "\\.\pipe\shuangxuPipeTest"); // 檢測(cè)管道是否可用 if(!WaitNamedPipeA(pipeName, 10000)){ WriteLog("管道[%s]無法打開", pipeName); return ; } // 連接管道 clientNamePipe = CreateFileA(pipeName, GENERIC_READ|GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); WriteLog("管道連接成功..."); scanf( "%s", szWritebuf ); // 發(fā)送數(shù)據(jù) if( !WriteFile(clientNamePipe, szWriteBuf, strlen(szWriteBuf), &dwNumWrite, NULL)){ WriteLog("發(fā)送數(shù)據(jù)失敗,GetLastError=[%#x]", GetLastError()); return ; } printf("發(fā)送數(shù)據(jù)成功:%sn", szWritebuf ); // 接收數(shù)據(jù) if( !ReadFile(clientNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL)){ WriteLog("接收數(shù)據(jù)失敗,GetLastError=[%#x]", GetLastError() ); return ; } WriteLog( "接收到服務(wù)器返回:%s", szReadBuf ); // 關(guān)閉管道 CloseHandle(clientNamePipe); }
Node 創(chuàng)建子進(jìn)程的流程
Unix
對(duì)于創(chuàng)建子進(jìn)程、創(chuàng)建管道、重定向管道均是在 c++ 層實(shí)現(xiàn)的
創(chuàng)建子進(jìn)程
int main(int argc,char *argv[]){ pid_t pid = fork(); if (pid < 0) { // 錯(cuò)誤 } else if(pid == 0) { // 子進(jìn)程 } else { // 父進(jìn)程 } }
創(chuàng)建管道
使用 socketpair 創(chuàng)建管道,其創(chuàng)建出來的管道是全雙工的,返回的文件描述符中的任何一個(gè)都可讀和可寫
int main () { int fd[2]; int r = socketpair(AF_UNIX, SOCK_STREAM, 0, fd); if (fork()){ /* 父進(jìn)程 */ int val = 0; close(fd[1]); while (1){ sleep(1); ++val; printf("發(fā)送數(shù)據(jù): %dn", val); write(fd[0], &val, sizeof(val)); read(fd[0], &val, sizeof(val)); printf("接收數(shù)據(jù): %dn", val); } } else { /*子進(jìn)程*/ int val; close(fd[0]); while(1){ read(fd[1], &val, sizeof(val)); ++val; write(fd[1], &val, sizeof(val)); } } }
當(dāng)我們使用 socketpair 創(chuàng)建了管道之后,父進(jìn)程關(guān)閉了 fd[1],子進(jìn)程關(guān)閉了 fd[0]。子進(jìn)程可以通過 fd[1] 讀寫數(shù)據(jù);同理主進(jìn)程通過 fd[0]讀寫數(shù)據(jù)完成通信。
對(duì)應(yīng)代碼:https://github.com/nodejs/node/blob/main/deps/uv/src/unix/process.c#L344
child_process.fork 的詳細(xì)調(diào)用
fork 函數(shù)開啟一個(gè)子進(jìn)程的流程
-
初始化參數(shù)中的 options.stdio,并且調(diào)用 spawn 函數(shù)
function spawn(file, args, options) { const child = new ChildProcess(); child.spawn(options); }
-
創(chuàng)建 ChildProcess 實(shí)例,創(chuàng)建子進(jìn)程也是調(diào)用 C++ 層 this._handle.spawn 方法
function ChildProcess() { // C++層定義 this._handle = new Process(); }
-
通過 child.spawn 調(diào)用到 ChildProcess.prototype.spawn 方法中。其中 getValidStdio 方法會(huì)根據(jù) options.stdio 創(chuàng)建和 C++ 交互的 Pipe 對(duì)象,并獲得對(duì)應(yīng)的文件描述符,將文件描述符寫入到環(huán)境變量 NODE_CHANNEL_FD 中,調(diào)用 C++ 層創(chuàng)建子進(jìn)程,在調(diào)用 setupChannel 方法
ChildProcess.prototype.spawn = function(options) { // 預(yù)處理進(jìn)程間通信的數(shù)據(jù)結(jié)構(gòu) stdio = getValidStdio(stdio, false); const ipc = stdio.ipc; const ipcFd = stdio.ipcFd; //將文件描述符寫入環(huán)境變量中 if (ipc !== undefined) { ArrayPrototypePush(options.envPairs, `NODE_CHANNEL_FD=${ipcFd}`); } // 創(chuàng)建進(jìn)程 const err = this._handle.spawn(options); // 添加send方法和監(jiān)聽I(yíng)PC中數(shù)據(jù) if (ipc !== undefined) setupChannel(this, ipc, serialization); }
-
子進(jìn)程啟動(dòng)時(shí),會(huì)根據(jù)環(huán)境變量中是否存在 NODE_CHANNEL_FD 判斷是否調(diào)用 _forkChild 方法,創(chuàng)建一個(gè) Pipe 對(duì)象, 同時(shí)調(diào)用 open 方法打開對(duì)應(yīng)的文件描述符,在調(diào)用setupChannel
function _forkChild(fd, serializationMode) { const p = new Pipe(PipeConstants.IPC); p.open(fd); p.unref(); const control = setupChannel(process, p, serializationMode); }
句柄傳遞
setupChannel 主要是完成了處理接收的消息、發(fā)送消息、處理文件描述符傳遞等
function setipChannel(){ channel.onread = function(arrayBuffer){ //... } target.on('internalMessage', function(message, handle){ //... }) target.send = function(message, handle, options, callback){ //... } target._send = function(message, handle, options, callback){ //... } function handleMessage(message, handle, internal){ //... } }
- target.send: process.send 方法,這里 target 就是進(jìn)程對(duì)象本身.
- target._send: 執(zhí)行具體 send 邏輯的函數(shù), 當(dāng)參數(shù) handle 不存在時(shí), 表示普通的消息傳遞;若存在,包裝為內(nèi)部對(duì)象,表明是一個(gè) internalMessage 事件觸發(fā)。調(diào)用使用JSON.stringify 序列化對(duì)象, 使用channel.writeUtf8String 寫入文件描述符中
- channel.onread: 獲取到數(shù)據(jù)時(shí)觸發(fā), 跟 channel.writeUtf8String 相對(duì)應(yīng)。通過 JSON.parse 反序列化 message 之后, 調(diào)用 handleMessage 進(jìn)而觸發(fā)對(duì)應(yīng)事件
- handleMessage: 用來判斷是觸發(fā) message 事件還是 internalMessage 事件
- target.on('internalMessage'): 針對(duì)內(nèi)部對(duì)象做特殊處理,在調(diào)用 message 事件
進(jìn)程間消息傳遞
-
父進(jìn)程通過 child.send 發(fā)送消息 和 server/socket 句柄對(duì)象
-
普通消息直接 JSON.stringify 序列化;對(duì)于句柄對(duì)象來說,需要先包裝成為內(nèi)部對(duì)象
message = { cmd: 'NODE_HANDLE', type: null, msg: message };
通過 handleConversion.[message.type].send 的方法取出句柄對(duì)象對(duì)應(yīng)的 C++ 層面的 TCP 對(duì)象,在采用JSON.stringify 序列化
const handleConversion = { 'net.Server': { simultaneousAccepts: true, send(message, server, options) { return server._handle; }, got(message, handle, emit) { const server = new net.Server(); server.listen(handle, () => { emit(server); }); } } //.... }
-
最后將序列化后的內(nèi)部對(duì)象和 TCP 對(duì)象寫入到 IPC 通道中
-
子進(jìn)程在接收到消息之后,使用 JSON.parse 反序列化消息,如果為內(nèi)部對(duì)象觸發(fā) internalMessage 事件
-
檢查是否帶有 TCP 對(duì)象,通過 handleConversion.[message.type].got 得到和父進(jìn)程一樣的句柄對(duì)象
-
最后發(fā)觸發(fā) message 事件傳遞處理好的消息和句柄對(duì)象,子進(jìn)程通過 process.on 接收