четверг, 29 сентября 2016 г.

Node.js Cluster

But how to increase the number of process with Node so that you can have a good scaling system?
In the .NET world you can find something similar is ASP.NET (hosted on IIS) and it's called "web garden", in Node instead it's called Cluster. Basically there are more than one active process and a "manager".
In that scenario you can use one process for each core of you computer, so your Node application can scale better with you hardware.
Basically it's like running 'node app.js' for each core you have, and another process to manage them all.
First step, install some packages:
npm install cluster --save
The goal of this example is to create one process for each core, so the first thing to do is to read the number of cores installed on your laptop:
var cluster = require('cluster');

if (cluster.isMaster) {
  var numCPUs = require('os').cpus().length;
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  Object.keys(cluster.workers).forEach(function(id) {
    console.log(cluster.workers[id].process.pid);
  });
}
cluster.isMaster is necessary to be sure that you are forking it just one time.
Now if you run the app you should have one process for each core, plus the master
In my case I've 8 core, so 9 process because of master.
The next step is to add a webserver, so
npm install http --save
and put your logic for each fork:
var cluster = require('cluster');
var http = require('http');

if (cluster.isMaster) {
  var numCPUs = require('os').cpus().length;
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  Object.keys(cluster.workers).forEach(function(id) {
    console.log(cluster.workers[id].process.pid);
  });
} else{

  // Create HTTP server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("This answer comes from the process " + process.pid);

  }).listen(8080);
}
Now calling the webserver you can see which process is answering your request:
image
Because the code is too simple, probably you'll get the same 'pid' for each request from your browser. The easier way to test it is to lock the thread (yes, I said that) so the "balancer" can switch the request to another process demonstrating the cluster.
In Node there isn't something like Thread.Sleep, so the best way to lock a thread is create something that keeps it busy, something like an infinite loop :smirk:
var cluster = require('cluster');
var http = require('http');

if (cluster.isMaster) {
  var numCPUs = require('os').cpus().length;
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  Object.keys(cluster.workers).forEach(function(id) {
    console.log(cluster.workers[id].process.pid);
  });
} else{

  // Create HTTP server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("This answer comes from the process " + process.pid);

    //that's just for example
    while(true){

    }

  }).listen(8080);
}
If you want to manage all the processes and to log some events, it could be helpful to track some events for each process and to send a message from the "worker" to the "master" or to check when a process dies.
To do that it's necessary to use message event on the worker, so here's the code:
var cluster = require('cluster');
var http = require('http');

if (cluster.isMaster) {

  console.log("Master pid: " + process.pid);

  var numberOfRequests = 0;

  var numCPUs = require('os').cpus().length;
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  Object.keys(cluster.workers).forEach(function(id) {
    console.log('creating process with id = ' + cluster.workers[id].process.pid);

    //getting message
    cluster.workers[id].on('message', function messageHandler(msg) {
      if (msg.cmd && msg.cmd == 'notifyRequest') {
        numberOfRequests += 1;
      }

      console.log("Getting message from process : ", msg.procId);
    });

    //Getting worker online
    cluster.workers[id].on('online', function online()
    {
      console.log("Worker pid: " + cluster.workers[id].process.pid + " is online");
    });

    //printing the listening port
    cluster.workers[id].on('listening', function online(address)
    {
      console.log("Listening on port + " , address.port);
    });

    //Catching errors
    cluster.workers[id].on('exit', function(code, signal) {
      if( signal ) {
        console.log("worker was killed by signal: "+signal);
      } else if( code !== 0 ) {
        console.log("worker exited with error code: "+code);
      } else {
        console.log("worker success!");
      }
    });
  });

  //Printing number of requests
  setInterval(function(){
    console.log("Handled " + numberOfRequests + " requests");
  }, 3000);

} else {

  // Create HTTP server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("This answer comes from the process " + process.pid);

    console.log("Message sent from http server");

    // Notify master about the request
    process.send({ cmd: 'notifyRequest', procId : process.pid });


  }).listen(8080);
}

Пример.

Файл cluster-boot.js

// Данный сценарий выполнит код из файла app.js с помощью нескольких worker.
// Число worker опеределено в константе WORKER_COUNT.

// Мастер-процесс будет отвечать на сигнал SIGHUP,
// который будет перезапускать worker'ы и перезагружать app.

var workerCount = process.env.WORKER_COUNT || 2
    , cluster = require('cluster');

// Определяем что каждый worker должен сделать.
// В данном случае каждый worker должен выполнить код из файла app.js.
// Подразумевается, что код в app.js представляет собой простой HTTP-сервер.
cluster.setupMaster({exec: 'app.js'});

/////////////////////////////////////////////////////////////
// Создание новых worker
/////////////////////////////////////////////////////////////

// Значение, определяющее разрешно ли создавать новые worker.
var stop = false;

// Создать первоначальный набор worker.
forkNewWorkers();

// Функция для создания необходимого числа новых worker в случае,
// если ранее не было решено их все остановить.
function forkNewWorkers () {
    if (!stop) {
        for (var i = numWorkers(); i < workerCount; i++) {
            cluster.fork();
        }
    }
}

// Функция для определения числа активных в данный момент worker.
function numWorkers () {
    return Object.keys(cluster.workers).length;
}

// Каждый worker может отключиться из-за того, что его процесс был убит или
// из-за того, что мы прошлись по массиву workersToStop и рестартовали каждый worker, перечисленный в нем.
// В любом случае при отключении worker мы создаем взамен него новые worker'ы для выполнения работ.
cluster.on('disconnect', forkNewWorkers);

// Теперь каждый worker начинает слушать порт.
// Как только worker будет готов к своей работе мы посылаем сигнал для перезапуска следующего worker.
cluster.on('listening', stopNextWorker);

//////////////////////////////////////////////////////////////
// Уничтожение worker
//////////////////////////////////////////////////////////////

// Список worker находящихся в очереди на перезапуск.
var workersToStop = [];

// Сообщить следующему worker, находящемуся в очереди на перезапуск, отключиться.
// Это позволит процессу завершить свою работу за 60 секунд перед отправкой сигнала SIGTERM.
function stopNextWorker () {
    var i = workersToStop.pop()
        , worker = cluster.workers[i];
    if (worker) {stopWorker(worker);}
}

// Остановить работу всех worker за раз.
function stopAllWorkers () {
    stop = true;
    console.log('stop all workers');
    for (var id in cluster.workers) {
        stopWorker(cluster.workers[id]);
    }
}

// Функция для остановки конкретного wroker.
// Делаем задержку в 60 секунд после отключения worker перед отправкой сигнала SIGTERM.
function stopWorker (worker) {
  console.log('stopping', worker.process.pid);
  worker.disconnect();
  var killTimer = setTimeout(function () {
    worker.kill();
  }, 60000);
  // Убеждаемся, что мы не будем подвешивать мастер-процесс при добавлении этого setTimeout
  killTimer.unref();
}

/////////////////////////////////////////////////////////////
// Прослушивания передачи сигналов в мастер-процесс из сторонних программ
/////////////////////////////////////////////////////////////

// Если сигнал HUP послан в мастер-процесс, то последовательно перезапустить все worker.
process.on('SIGHUP', function () {
    console.log('restarting all workers');
    workersToStop = Object.keys(cluster.workers);
    stopNextWorker();
});

// Если сигнал TERM послан в мастер-процесс, то убить все worker за раз.
process.on('SIGTERM', stopAllWorkers);

/////////////////////////////////////////////////////////////
// Вывод сообщения об успешном запуске программы
/////////////////////////////////////////////////////////////

// Вывод в консоль сообщения о том, что программа запущена успешно
console.log('app master', process.pid, 'booted');