Skip to content

Commit

Permalink
cluster improvements: Worker class and isolate internal messages
Browse files Browse the repository at this point in the history
Fixes #2388
  • Loading branch information
AndreasMadsen authored and ry committed Jan 5, 2012
1 parent e21643d commit 5f08c3c
Show file tree
Hide file tree
Showing 11 changed files with 928 additions and 165 deletions.
6 changes: 6 additions & 0 deletions doc/api/child_processes.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ And then the child script, `'sub.js'` might look like this:
In the child the `process` object will have a `send()` method, and `process`
will emit objects each time it receives a message on its channel.

There is a special case when seinding a `{cmd: 'NODE_foo'}` message. All messages
containging a `NODE_` prefix in its `cmd` property will not be emitted in
the `message` event, since this are internal messages used by node core.
Messages contain the prefix are emitted in the `internalMessage` event, you
should by all means avoid using this feature, it may change without warranty.

By default the spawned Node process will have the stdout, stderr associated
with the parent's. To change this behavior set the `silent` property in the
`options` object to `true`.
Expand Down
234 changes: 195 additions & 39 deletions doc/api/cluster.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ all share server ports.
console.log('worker ' + worker.pid + ' died');
});
} else {
// Worker processes have a http server.
http.Server(function(req, res) {
// Workers can share any TCP connection
// In this case its a HTTP server
http.createServer(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
Expand All @@ -34,66 +35,221 @@ Running node will now share port 8000 between the workers:
Worker 2438 online
Worker 2437 online

The difference between `cluster.fork()` and `child_process.fork()` is simply
that cluster allows TCP servers to be shared between workers. `cluster.fork`
is implemented on top of `child_process.fork`. The message passing API that
is available with `child_process.fork` is available with `cluster` as well.
As an example, here is a cluster which keeps count of the number of requests
in the master process via message passing:

### cluster.isMaster

This boolean flag is true if the process is a master. This is determined
by the `process.env.NODE_UNIQUE_ID`. If `process.env.NODE_UNIQUE_ID` is
undefined `isMaster` is `true`.

### cluster.isWorker

This boolean flag is true if the process is a worker forked from a master.
If the `process.env.NODE_UNIQUE_ID` is set to a value different efined
`isWorker` is `true`.

### Event: 'fork'

When a new worker is forked the cluster module will emit a 'fork' event.
This can be used to log worker activity, and create you own timeout.

var timeouts = [];
var errorMsg = function () {
console.error("Something must be wrong with the connection ...");
});

cluster.on('fork', function (worker) {
timeouts[worker.uniqueID] = setTimeout(errorMsg, 2000);
});
cluster.on('listening', function (worker) {
clearTimeout(timeouts[worker.uniqueID]);
});
cluster.on('death', function (worker) {
clearTimeout(timeouts[worker.uniqueID]);
errorMsg();
});

### Event: 'online'

After forking a new worker, the worker should respond with a online message.
When the master receives a online message it will emit such event.
The difference between 'fork' and 'online' is that fork is emitted when the
master tries to fork a worker, and 'online' is emitted when the worker is being
executed.

cluster.on('online', function (worker) {
console.log("Yay, the worker responded after it was forked");
});

### Event: 'listening'

When calling `listen()` from a worker, a 'listening' event is automatically assigned
to the server instance. When the server is listening a message is send to the master
where the 'listening' event is emitted.

cluster.on('listening', function (worker) {
console.log("We are now connected");
});

### Event: 'death'

When any of the workers die the cluster module will emit the 'death' event.
This can be used to restart the worker by calling `fork()` again.

cluster.on('death', function(worker) {
console.log('worker ' + worker.pid + ' died. restart...');
cluster.fork();
});

### cluster.fork([env])

Spawn a new worker process. This can only be called from the master process.
The function takes an optional `env` object. The properties in this object
will be added to the process environment in the worker.

### cluster.workers

In the cluster all living worker objects are stored in this object by there
`uniqueID` as the key. This makes it easy to loop thouge all liveing workers.

// Go througe all workers
function eachWorker(callback) {
for (var uniqueID in cluster.workers) {
callback(cluster.workers[uniqueID]);
}
}
eachWorker(function (worker) {
worker.send('big announcement to all workers');
});

Should you wich to reference a worker over a communication channel this unsing
there `uniqueID` this is also the easies way to find the worker.

socket.on('data', function (uniqueID) {
var worker = cluster.workers[uniqueID];
});

## Worker

This object contains all public information and method about a worker.
In the master it can be obtainedusing `cluster.workers`. In a worker
it can be obtained ained using `cluster.worker`.

### Worker.uniqueID

Each new worker is given its own unique id, this id i stored in the `uniqueID`.

### Worker.process

All workers are created using `child_process.fork()`, the returned object from this
function is stored in process.

### Worker.send(message, [sendHandle])

This function is equal to the send methods provided by `child_process.fork()`.
In the master you should use this function to send a message to a specific worker.
However in a worker you can also use `process.send(message)`, since this is the same
function.

This example will echo back all messages from the master:

if (cluster.isMaster) {
var worker = cluster.fork();
worker.send('hi there');

} else if (cluster.isWorker) {
process.on('message', function (msg) {
process.send(msg);
});
}

### Worker.destroy()

This function will kill the worker, and inform the master to not spawn a new worker.
To know the difference between suicide and accidently death a suicide boolean is set to true.

cluster.on('death', function (worker) {
if (worker.suicide === true) {
console.log('Oh, it was just suicide' – no need to worry').
}
});

// destroy worker
worker.destroy();

### Worker.suicide

This property is a boolean. It is set when a worker dies, until then it is `undefined`.
It is true if the worker was killed using the `.destroy()` method, and false otherwise.

### Event: message

This event is the same as the one provided by `child_process.fork()`.
In the master you should use this event, however in a worker you can also use
`process.on('message')`

As an example, here is a cluster that keeps count of the number of requests
in the master process using the message system:

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;

if (cluster.isMaster) {
// Fork workers.
for (var i = 0; i < 2; i++) {
var worker = cluster.fork();

worker.on('message', function(msg) {
if (msg.cmd && msg.cmd == 'notifyRequest') {
numReqs++;
}
});
}

// Keep track of http requests
var numReqs = 0;
setInterval(function() {
console.log("numReqs =", numReqs);
}, 1000);

// Count requestes
var messageHandler = function (msg) {
if (msg.cmd && msg.cmd == 'notifyRequest') {
numReqs += 1;
}
};

// Start workers and listen for messages containing notifyRequest
cluster.autoFork();
Object.keys(cluster.workers).forEach(function (uniqueID) {
cluster.workers[uniqueID].on('message', messageHandler);
});

} else {

// Worker processes have a http server.
http.Server(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
// Send message to master process

// notify master about the request
process.send({ cmd: 'notifyRequest' });
}).listen(8000);
}

### Event: online

Same as the `cluster.on('online')` event, but emits only when the state change
on the specified worker.

### cluster.fork([env])
cluster.fork().on('online', function (worker) {
// Worker is online
};

Spawn a new worker process. This can only be called from the master process.
The function takes an optional `env` object. The propertyies in this object
will be added to the process environment in the worker.
### Event: listening

### cluster.isMaster
### cluster.isWorker
Same as the `cluster.on('listening')` event, but emits only when the state change
on the specified worker.

Boolean flags to determine if the current process is a master or a worker
process in a cluster. A process `isMaster` if `process.env.NODE_WORKER_ID`
is undefined.
cluster.fork().on('listening', function (worker) {
// Worker is listening
};

### Event: 'death'
### Event: death

When any of the workers die the cluster module will emit the 'death' event.
This can be used to restart the worker by calling `fork()` again.

cluster.on('death', function(worker) {
console.log('worker ' + worker.pid + ' died. restart...');
cluster.fork();
});
Same as the `cluster.on('death')` event, but emits only when the state change
on the specified worker.

Different techniques can be used to restart the worker depending on the
application.
cluster.fork().on('death', function (worker) {
// Worker has died
};
16 changes: 15 additions & 1 deletion lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,25 @@ function setupChannel(target, channel) {
jsonBuffer += pool.toString('ascii', offset, offset + length);

var i, start = 0;

//Linebreak is used as a message end sign
while ((i = jsonBuffer.indexOf('\n', start)) >= 0) {
var json = jsonBuffer.slice(start, i);
var message = JSON.parse(json);

target.emit('message', message, recvHandle);
//Filter out internal messages
//if cmd property begin with "_NODE"
if (message !== null &&
typeof message === 'object' &&
typeof message.cmd === 'string' &&
message.cmd.indexOf('NODE_') === 0) {
target.emit('inernalMessage', message, recvHandle);
}
//Non-internal message
else {
target.emit('message', message, recvHandle);
}

start = i + 1;
}
jsonBuffer = jsonBuffer.slice(start);
Expand Down
Loading

0 comments on commit 5f08c3c

Please sign in to comment.