前言

上节我们讲到,通过 fork() 或者其他API,创建子进程之后,可以通过 send()process.on('message') 进行父子进程间的通信。这样就实现了主进程代理请求到工作进程,实现了 Nodejs集群

父子进程间通信

负载均衡

通过代理,可以避免端口不能重复监听的问题,甚至可以在代理进程上做适当的负载均衡,使得每个子进程可以较为均衡地执行任务。下面我们构建了一个简单的 Web 服务器,并实现在两个工作进程之间做简单的负载均衡。

主进程,负责代理到对应进程中:

// main.js
const { fork } = require('child_process');
const normal = fork('subprocess.js', ['normal']);
const special = fork('subprocess.js', ['special']);
// Open up the server and send sockets to child. Use pauseOnConnect to prevent
// 套接字在发送给子进程之前不会被读取
const server = require('net').createServer({ pauseOnConnect: true });
let flag = 0;
server.on('connection', (socket) => {
  flag++;
  // this is special priority.
  if (flag % 2 === 0) {
    special.send('socket', socket);
    return;
  }
  // This is normal priority.
  normal.send('socket', socket);
});
server.listen(1337);

这是工作进程,接收socket对象并做出响应:

// subprocess.js
process.on('message', (m, socket) => {
  if (m === 'socket') {
    // Check that the client socket exists. 
    // It is possible for the socket to be closed between the time it is
    if (socket) {
      // console.log(`Request handled with ${process.argv[2]} priority`);
      socket.end(`Request handled with ${process.argv[2]} priority, running on ${process.pid}`);
    }
  }
});

然后我又编写了一个 Nodejs 脚本,来发出十个 HTTP 请求:

const cp = require("child_process");
for (let i = 0; i < 10; i++) {
  cp.exec(`curl --http0.9 "http://127.0.0.1:1337"`, (err, stdout, stderr) => {
    console.log(`finished: ${i}, and received: `, stdout);
  })
}

最后运行结果如下:

句柄传递

在使用 send() 方法时,我们注意到,除了能通过IPC发送数据外,还能发送句柄。第二个可选参数就是一个句柄:

child.send(message, [sendHandle]);

💡 句柄是一种可以用来标识资源的引用,它的内部包含了指向对象的文件描述符。比如句柄可以用来标识一个服务器端socket对象、一个客户端socket对象、一个UDP套接字、一个管道等。

在主进程将句柄发送给子进程之后,工作模型就从主进程响应用户请求变成了子进程监听用户活动:

进程对象send()方法可以发送的句柄类型包括如下几种:

  • net.Socket。TCP套接字。
  • net.Server。TCP服务器,任意建立在TCP服务上的应用层服务都可以享受到它带来的好处。
  • net.Native。C++层面的TCP套接字或IPC管道。
  • dgram.Socket。UDP套接字。
  • dgram.Native。C++层面的UDP套接字。

💡 另外要注意,send()方法能发送消息和句柄并不意味着它能发送任意对象,message 参数和文件句柄都要先通过 JSON.stringfy() 进行序列化后再放入IPC通道中:

集群

通过 child_process模块,我们完成了父子进程的创建和通信,已经初步搭建了一个Node集群。还有一些问题需要考虑:

  • 性能问题。
  • 多个工作进程的存活状态管理。
  • 工作进程的平滑重启。
  • 配置或者静态数据的动态重新载入。
  • 其他细节。

这其中最重要的便是集群的稳定性,这决定了该服务模型能否真正用于实践生成中。虽然我们创建了很多工作进程,但每个工作进程依然是在单线程上执行的,它的稳定性还不能得到完全的保障。我们需要建立起一个健全的机制来保障Node应用的健壮性。

子进程事件

父进程能监听到的,与子进程相关的事件:

  • error:当子进程无法被复制创建、无法被杀死、无法发送消息时会触发该事件。
  • exit:子进程退出时触发该事件。如果是正常退出,这个事件的第一个参数为退出码,否则为null。如果进程是通过kill()方法被杀死的,会得到第二个参数,它表示杀死进程时的信号。
  • close:在子进程的标准输入输出流中止时触发该事件,参数与exit相同。
  • disconnect:在父进程或子进程中调用disconnect()方法时触发该事件,在调用该方法时将关闭监听IPC通道

除了 send() 外,还能通过 kill() 方法给子进程发送消息。kill() 方法并不能真正地将通过IPC相连的子进程杀死,它只是给子进程发送了一个系统信号。默认情况下,父进程将通过 kill() 方法给子进程发送一个 SIGTERM信号

// 子进程
child.kill([signal]);
// 当前进程
process.kill(pid, [signal]);
// 监听
process.on(signal, callback)

💡 在POSIX标准中,有一套完备的信号系统,在命令行中执行kill -l可以看到详细的信号列表,如下所示:

而 Node 提供了这些信号对应的信号事件,每个进程都可以监听这些信号事件。这些信号事件是用来通知进程的,每个信号事件有不同的含义,进程在收到响应信号时,应当做出约定的行为:

process.on('SIGTERM', () => {
    console.log("got sigterm, exiting...");
    process.exit(1);
});
console.log("process running on: ", process.pid);
process.kill(process.pid, "SIGTERM");

自动重启

有了父子进程之间的相关事件之后,就可以在这些关系之间创建出需要的机制了,至少我们能够通过监听子进程的 exit事件 来获知其退出的信息。接着前文的多进程架构,我们在主进程上要加入一些子进程管理的机制,比如重新启动一个工作进程来继续服务:

主进程代码:

// master.js
// master.js
const { fork } = require('child_process');
const cpus = require('os').cpus();
const server = require('net').createServer();
server.listen(1337);
const workers = {};
// process.on('uncaughtException', function (err) {
//   console.log(`Master uncaughtException:\r\n`);
//   console.log(err);
// });
const createWorker = () => {
  const worker = fork('./worker.js');
  // 收到信号后立即重启新进程
  worker.on('message', function (message) {
    if (message.act === 'suicide') {
      createWorker();
    }
  });
  // 某个进程终止时重新启动新的进程
  worker.on('exit', () => {
    console.log('Worker ' + worker.pid + ' exited.');
    delete workers[worker.pid];
    // createWorker();
  });
  // 句柄转发
  worker.send('server', server);
  workers[worker.pid] = worker;
  console.log('Create worker. pid: ' + worker.pid);
};
for (let i = 0; i < cpus.length; i++) {
  createWorker();
}
// server.close();
// 进程自己退出时,让所有工作进程退出
process.on('exit', () => {
  for (let pid in workers) {
    workers[pid].kill();
  }
});

子进程代码:

// worker.js
const http = require('http');
const server = http.createServer((req, res) => {
  res.writeHead(200, {'Content-Type': 'text/plain'});
  res.end('handled by child, pid is ' + process.pid + '\n');
  // 抛出异常,捕获后终止进程
  throw new Error('throw exception');
});
var worker;
process.on('message', (m, tcp) => {
  if (m === 'server') {
    worker = tcp;
    worker.on('connection', (socket) => {
      server.emit('connection', socket);
    });
  }
});
// 捕获异常后终止进程
process.on('uncaughtException', (err) => {
  // 主动发出信号,避免等待连接断开时收到新请求而缺少进程无法响应
  process.send({
    act: 'suicide'
  });
  // 停止接收新的连接
  worker.close(function () {
    // 所有已有连接断开后,退出进程
    process.exit(1);
  });
  // 避免长连接请求长时间无法终止,5s后自动终止
  setTimeout(() => {
    process.exit(1);
  }, 5000)
});

运行父进程 master.js,控制台中会打印出开启的进程 PID

在 Linux 中,你可以直接使用 kill -9 [pid] 来终止进程。在 Windows 中,你需要打开任务管理器,找到 node.exe 的进程,终止其中某个。此时命令行会显示该进程被终止了,然后重新开启一个新的进程。

当然,你也可以使用我们之前写的 run.js 脚本,每发起一个请求,子进程响应请求之后会抛出一个异常,异常在捕获之后会终止该进程。

💡 我们之前写的 run.js 脚本是并行执行的,此时会存在多个请求被分配到同一个 socket ,即分配到同一个进程中执行。那么就会存在互斥的问题,即某个请求结束后就终止该进程,导致其他请求无法获得响应而终止。此时你需要将 exec 方法改为同步方法:

const cp = require("child_process");
const cpus = require("os").cpus();
const sleep = (delay) => {
  const now = Date.now();
  while (Date.now() - now < delay);
  return;
}
for (let i = 0; i < cpus.length; i++) {
  const out = cp.execSync(`curl --http0.9 "http://127.0.0.1:1337"`);
  sleep(1000);
  console.log(out.toString());
}

该模型一旦有异常出现,主进程会创建新的工作进程来为用户服务,旧的进程一旦处理完已有连接就自动断开。整个过程使得我们的应用的稳定性和健壮性大大提高:

总结

至此,我们完成了一个简单的基于父子进程通信、具备异常重启进程功能的 Web服务器 就已经搭建完成了。对于 Nodejs 多进程编程你也有了初步的了解。接下来我们将介绍 cluster模块,并介绍一下在 Nodejs 中进行多线程编程。

以上就是Nodejs搭建多进程Web服务器实现过程的详细内容,更多关于Nodejs搭建多进程Web服务器的资料请关注阿兔在线工具其它相关文章!

点赞(0)

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部