Download .zip |
Info | Documentation | View files (20) | Download .zip | Reputation | Support forum | Blog | Links |
Last Updated | Ratings | Unique User Downloads | Download Rankings | |||||
2017-05-01 (17 hours ago) | Not enough user ratings | Total: 35 This week: 8 | All time: 494 This week: 3 |
Version | License | JavaScript version | Categories | |||
collab-ms 1.0 | MIT/X Consortium ... | 6 | Node.js, Web services, Language, Libr..., E..., T... |
Description | Author | ||||||||||||||
This package can manage the creation and communication between manage and worker processes like in a company. Innovation Award |
|
Simple, yet complete micro-services[1] library for Node. Zero dependencies, high-quality typed code, simple flow, tree-like structure support, optional Promises, load balancing and queues. pm2 compatible.
Install:
npm install collab-ms
GitHub: https://github.com/Ami777/collab-ms
npm: https://www.npmjs.com/package/collab-ms
[1] More like pseudo-micro-services. We currently do not support different languages, TCP/IP communication etc.
I created this library to be able to create complex tree-like-structure micro-services architecture in my Node apps. The idea was to create simple, yet complete solution with good quality typed code (TypeScript & ES6, however you can use this library in pure JS without any additional steps). ZERO external dependencies.
The name in shortcut for "collaboration". In this library we think about micro-services structure as about company. In fact, few concepts are based on management theory. We think about each level as about collaborators: Managers and Workers. Typically, first level is called CEO and is Manager. Each mid-level is both Manager and Worker. Last level is usually Worker.
The communication is done in Manager-Worker manner. When you run your process you usually want to spawn main Manager - we call it CEO Manager. This Manager may spawn another Workers and Managers. This is done automatically by forking your main process. It's up to you to decide how to use collab-ms - we do not force you to anything. Here we will cover some basic about communication and our proposition of use.
First, Manager and structure. Then Worker. Then communication. Then some fun.
First, let's create new project with index.js file. Then install collab-ms from npm:
npm i collab-ms --save
Add library to your code:
const collab = require('collab-ms');
You need to decide which Transport to use. We will now use Transport built-in in Collab and Node :
collab.setTransport( collab.useTransportCpFork() );
We need to create basic structure. First, we will create first Manager - CEO:
const ceo = new collab.Manager();
This is it. This is all you need to build basic Manager which then can create new Workers and send messages. We cannot receive normal messages yet.
We will use one single file to manage all of the possible processes types. To get type of process spawned we can use collab.getMyRole() and collab.isCEO() functions. Watch out! This is using command line parameters to determinate type. So if you run index.js with any parameters detecting CEO may not work as expected.
Let's use this knowledge to run different job in CEO and in Worker. Modify your const ceo... code:
if (collab.isCEO()){
//CEO code
const ceo = new collab.Manager();
} else {
//Worker code
}
In the else part of code we will create Worker instance:
if (collab.isCEO()){
//CEO code
const ceo = new collab.Manager();
} else {
//Worker code
const worker = new collab.Worker();
}
Worker can now send messages to closest Manager (it will be CEO in our simple case), but we cannot read any messages from Manager (CEO) yet. Also, Worker is not yet spawned. Let's spawn it using Manager.newWorker() method.
While forking process to create new Worker we have a lot of options. But it's simplest form in just enough for us right now. Let's add this code to CEO code:
ceo.newWorker('workerExample');
This will fork new process with type 'workerExample' (this is our name, it can be anything, but it may contain only letters).
Let's add console.log() in both parts and now the complete code looks like this:
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
const ceo = new collab.Manager();
if (collab.isCEO()){
//CEO code
console.log('CEO is spawned.');
const ceo = new collab.Manager();
ceo.newWorker('worker');
} else {
//Worker code
console.log('Worker is spawned.');
const worker = new collab.Worker();
}
Run it with:
node index
And you should see this:
CEO is spawned.
Worker is spawned.
Node won't finish by itself, click Ctrl+C to kill CEO with all of the spawned processes (Worker in our case). Remove console.log() lines.
Now it's time to add communication. We will try something simple - normal non-Promised messages. We need to modify our code to pass callback function for massages in Manager as first argument in constructor. Callback will receive 3 arguments, but we only need 2 right now. Add this callback:
function (worker, data) {
console.log('Message', data, 'from', worker.name);
}
We also need to add very similar code to Worker constructor. We just want 1 argument which is data:
function (data) {
console.log('Message', data, 'from Manager');
}
You can then try to send message from Worker to Manager using Worker.send() method in Worker part of code:
worker.send('Hi boss!');
You can send whatever you want - it may be primitive like in our example or more complex object.
Our code is now:
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
const ceo = new collab.Manager();
if (collab.isCEO()){
//CEO code
const ceo = new collab.Manager(function (worker, data) {
console.log('Message', data, 'from', worker.name);
});
ceo.newWorker('workerExample');
} else {
//Worker code
const worker = new collab.Worker(function (data) {
console.log('Message', data, 'from Manager');
});
worker.send('Hi boss!');
}
When we run it we get:
Message Hi boss! from WORKEREXAMPLE #1
As you can see, everything is working as expected. Also, you can see name automatically given to Worker by Manager. This name is unique only for forking Manager, not in the whole structure.
Manager can send messages, too. This will take additional steps as Manager may have lots of Workers. Unlike Worker which have only one Manager. We need to access our Worker from Manager using one of these options:
We will use first option right now. So we modify creation line to:
ceo.newWorker('workerExample').then( workerExample => {
});
Now we can simply call WorkerInfo.send() method in the callback:
workerExample.send('Hi worker!');
The complete code is now:
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
const ceo = new collab.Manager();
if (collab.isCEO()){
//CEO code
const ceo = new collab.Manager(function (worker, data) {
console.log('Message', data, 'from', worker.name);
});
ceo.newWorker('workerExample').then( workerExample => {
workerExample.send('Hi worker!');
});
} else {
//Worker code
const worker = new collab.Worker(function (data) {
console.log('Message', data, 'from Manager');
});
worker.send('Hi boss!');
}
And it's result is:
Message Hi worker! from Manager
Message Hi boss! from WORKEREXAMPLE #1
We could also use cool async/await syntax:
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
const ceo = new collab.Manager();
if (collab.isCEO()){
//CEO code
const ceo = new collab.Manager(function (worker, data) {
console.log('Message', data, 'from', worker.name);
});
const workerExample = await ceo.newWorker('workerExample');
workerExample.send('Hi worker!');
} else {
//Worker code
const worker = new collab.Worker(function (data) {
console.log('Message', data, 'from Manager');
});
worker.send('Hi boss!');
}
That's all. You have basic communication done. We will now do some interesting stuff.
This will modify previous example. We will use Promised messages to simplify the flow of our code. You may find Bluebird library helpful for Promises. We also request Worker to do some real work - to add two numbers.
CEO doesn't need callback function as we won't use non-Promised mesages. However, you can still use them and mix both methods. It can be helpful to make your code readable but also to be able to receive non-Promised messages about the state of the Worker etc.
We will also use WorkerInfo.sendWithPromise() method instead of the WorkerInfo.send(). Result of WorkerInfo.sendWithPromise() is Promise. We will use it. CEO code part is now:
//CEO code
const ceo = new collab.Manager();
ceo.newWorker('workerExample').then( workerExample => {
workerExample.sendWithPromise({a: 2, b: 3}).then(function(result){
console.log('Result of 2+3 from Worker is', result);
}).catch(function(err){
console.log('Error in Worker', err);
});
});
Now, in the Worker code part we also need to modify arguments in constructor. We don't need first argument for now (we will change callback function into null). Second argument is also callback function but for Promised messages. It will be called like this: (data, resolve, reject). Watch out! resolve and reject arguments are not the same are standard Promises/A+ ones (they have second argument, not needed now).
Let's modify Worker code:
//Worker code
const worker = new collab.Worker(null, function (data, resolve, reject) {
resolve( data.a + data.b );
});
Run the code, you should see this:
Result of 2+3 from Worker is 5
Promises in collab-ms are easy and good-looking, aren't they?
Now we will try to make more complex structure. This will be CEO Manager-Manager/Worker-Worker. It looks like this, because usually mid-level manager is both Manager and Worker. Then we will try to communicate between closest processes (CEO-Manager and Manager-Worker). At the end we will take a look at passing messages up and down in structure (CEO-Worker and answer Worker-CEO).
In this tutorial we will use async/await for the creation of Workers.
To create our structure we need to modify and simplify our code like this:
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()){
case '': //CEO
const ceo = new collab.Manager();
const midLevelManager = await ceo.newWorker('midLevelManager');
break;
case 'midLevelManager': //Mid-level Manager
const midWorker = new collab.Worker();
const manager = new collab.Manager();
const newWorker = ceo.newWorker('worker');
break;
case 'worker': //Worker
const worker = new collab.Worker();
break;
}
Take a look - we use collab.getMyRole() function to get role of the process. Empty string means we want CEO. midLevelManager needs to create both: Worker and Manager instances. This code set ups our structure correctly.
Let's add some simple Promises communication between CEO-Manager and Worker-Manager:
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()){
case '': //CEO
const ceo = new collab.Manager();
const midLevelManager = await ceo.newWorker('midLevelManager');
midLevelManager.sendWithPromise('Hi.').then(function(data){
console.log('< Answer', data, 'from mid-level Manager');
});
break;
case 'midLevelManager': //Mid-level Manager
const midWorker = new collab.Worker(null, function(data, resolve, reject){
console.log('> Message', data, 'from CEO');
resolve(data);
});
const manager = new collab.Manager();
const newWorker = await manager.newWorker('worker');
newWorker.sendWithPromise('Yo!').then(function(data){
console.log('< Answer', data, 'from Worker');
});
break;
case 'worker': //Worker
const worker = new collab.Worker(null, function(data, resolve, reject){
console.log('> Message', data, 'from mid-level Manager');
resolve(data);
});
break;
}
Result is:
> Message Hi. from CEO
< Answer Hi. from mid-level Manager
> Message Yo! from mid-level Manager
< Answer Yo! from Worker
Now we can try to pass messages from CEO to last Worker and answer from last Worker to CEO. It requires only some simple logic, we will also remove previous messages for clarity (but the logics let us send all of the messages):
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()){
case '': //CEO
const ceo = new collab.Manager();
const midLevelManager = await ceo.newWorker('midLevelManager');
midLevelManager.sendWithPromise('Tell Worker to come to my office.').then(function(data){
console.log('< Answer', data, 'from Worker');
});
break;
case 'midLevelManager': //Mid-level Manager
const manager = new collab.Manager();
const newWorker = await manager.newWorker('worker');
const midWorker = new collab.Worker(null, function(data, resolve, reject){
if (data.indexOf('Worker') > -1){
newWorker.sendWithPromise('Boss told ya: ' + data).then(resolve).catch(reject);
} else {
console.log('> Message', data, 'from CEO');
resolve(data);
}
});
break;
case 'worker': //Worker
const worker = new collab.Worker(null, function(data, resolve, reject){
if (data.indexOf('Worker') > -1){
console.log('> Message', data, 'from Boss passed by mid-level Manager');
resolve('I\'m on my way CEO.');
} else {
console.log('> Message', data, 'from mid-level Manager');
resolve(data);
}
});
break;
}
The result it:
> Message Boss told ya: Tell Worker to come to my office. from Boss passed by mid-level Manager
< Answer I'm on my way CEO. from Worker
This is the last tutorial example. Here we will use special Balancer subtype of Manager. We will build structure like this:
We will tell Balancer to let each of 3 Workers to make 2 jobs at once. Each Worker will finish it's job after 2 seconds. Then, CEO will tell Balancer to give Workers 10 jobs.
First we will use simple communication (work-done non-Promised messages). CEO will tell Balancer to add jobs. Balancer will give jobs to Workers. When Worker is done it will send result to Balancer and it will output it.
Then we will use Promises for all the communication. The flow is almost the same, CEO will also tell Balancer to add jobs but CEO will get an answer from Balancer when all the jobs are done. We will need Bluebird library for this!
In this tutorial we will use async/await for the creation of Workers.
First, let's build our structure:
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()){
case '': //CEO
const ceo = new collab.Manager();
const midLevelBalancer = await ceo.newWorker('balancer');
break;
case 'balancer': //Balancer Manager
const manager = new collab.Balancer();
const midWorker = new collab.Worker();
for ( let i = 0; i < 3; i++ )
await manager.newBalancedWorker('worker', 2);
break;
case 'worker': //Worker
const worker = new collab.Worker();
break;
}
Take a look. To add Worker in Balancer we use special method Balancer.newBalancedWorker(). Second argument is maximum number of jobs that this Worker can do at the same time. Now, let CEO tell Balancer to add the jobs and let's do it.
CEO and Balancer parts looks now like this:
case '': //CEO
const ceo = new collab.Manager();
const midLevelBalancer = await ceo.newWorker('balancer');
midLevelBalancer.send('add10jobs');
break;
case 'balancer': //Balancer Manager
const manager = new collab.Balancer();
const midWorker = new collab.Worker(function(data){
if (data == 'add10jobs'){
for ( let i = 0; i < 10; i++ )
manager.addJob();
}
});
for ( let i = 0; i < 3; i++ )
await manager.newBalancedWorker('worker', 2);
break;
Now, we will add some test code to Worker:
const worker = new collab.Worker(function(){
setTimeout(function(){
worker.sendWorkDone('Done!');
}, 2000);
});
Can you see the Worker.sendWorkDone() method? It is used to inform Balancer that one job is finished.
And code to receive this message in Balancer, we will modify Balancer's constructor:
const manager = new collab.Balancer(function(worker, data){
console.log('Message ', data, 'from', worker.name);
});
Ready code looks like this:
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()){
case '': //CEO
const ceo = new collab.Manager();
const midLevelBalancer = await ceo.newWorker('balancer');
midLevelBalancer.send('add10jobs');
break;
case 'balancer': //Balancer Manager
const manager = new collab.Balancer(function(worker, data){
console.log('Message ', data, 'from', worker.name);
});
const midWorker = new collab.Worker(function(data){
if (data == 'add10jobs'){
for ( let i = 0; i < 10; i++ )
manager.addJob();
}
});
for ( let i = 0; i < 3; i++ )
await manager.newBalancedWorker('worker', 2);
break;
case 'worker': //Worker
const worker = new collab.Worker(function(){
setTimeout(function(){
worker.sendWorkDone('Done!');
}, 2000);
});
break;
}
And the result after about 4 seconds is:
Message Done! from WORKER #2
Message Done! from WORKER #1
Message Done! from WORKER #2
Message Done! from WORKER #1
Message Done! from WORKER #3
Message Done! from WORKER #3
Message Done! from WORKER #2
Message Done! from WORKER #1
Message Done! from WORKER #2
Message Done! from WORKER #1
This is what Balancer is doing after adding or finishing the job:
Let's change this code to use Promises, it will be a little bit more complex and will use Promise.all():
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()){
case '': //CEO
const ceo = new collab.Manager();
const midLevelBalancer = await ceo.newWorker('balancer');
midLevelBalancer.sendWithPromise('add10jobs').then(function(data){
console.log('Balancer told CEO that all jobs are done!', data);
});
break;
case 'balancer': //Balancer Manager
const manager = new collab.Balancer();
const midWorker = new collab.Worker(null, function(data, resolve, reject){
if (data == 'add10jobs'){
let promises = [];
for ( let i = 0; i < 10; i++ )
promises.push(manager.addJobWithPromise());
Promise.all(promises).then(resolve).catch(reject);
}
});
for ( let i = 0; i < 3; i++ )
await manager.newBalancedWorker('worker', 2);
break;
case 'worker': //Worker
const worker = new collab.Worker(null, function(data, resolve, reject){
setTimeout(function(){
resolve('Done!', true);
}, 2000);
});
break;
}
There is one new thing here. Look, when Worker is done it is passing true as a second argument to resolve function. This is very important in our code, because it will tell Manager it should treat answer as work-done Promised message.
The result after about 4 seconds is:
Balancer told CEO that all jobs are done! [ 'Done!', 'Done!', 'Done!', 'Done!', 'Done!', 'Done!', 'Done!', 'Done!', 'Done!', 'Done!' ]
Now we have all: Promises, balancing, queue, tree structure. In just a few lines of code.
There were some breaking changes. Usually this means adding one line after requiring the lib + one small change when creating new Workers.
Transport is the way collab-ms will work. The way processes are spawned, managed and are communicating with each other. Transports are here to make collab-ms easy to use in different environments and transparent to you.
You need to set Transport for collab-ms before you use it. Best place is just after require/include.
To set Transport call:
collab.setTransport( someTransport );
Read sections below for info about someTransport
.
You may use more than 1 Transport. This should not change anything in your code, except for a setTransport call.
This is basic Transport. It was the only transport in collab-ms 0.X. If you are migrating from older version - you might want to use it.
It uses child_process built-in in Node and forks process to create Workers.
Usage:
collab.setTransport( collab.useTransportCpFork() );
opts
argument in newWorker and newBalancedWorker is then ForkOptions passed to child_process.fork.
This is Transport that works well with pm2. It uses pm2 API to create Workers. Each process will be managed by pm2.
Usage:
collab.setTransport( collab.useTransportPm2( require('pm2') ) );
Remember to pass pm2
as argument of useTransportPm2. Like in the example above.
opts
argument in newWorker and newBalancedWorker is then options object passed to pm2.start.
This is typing file for collab-ms, it is also used as a reference/documentation. If you use collab-ms with TypeScript you have this docs in your IDE, too.
import { ChildProcess } from "child_process";
import ChildProcessForkTransport from "./transCp";
import Pm2Transport from "./transPm2";
declare module Collab {
interface NormalSendFunction {
/
* @param data Any data you want to pass.
*/
(data?: any): void;
}
interface ResolveFunction {
/
* @param data Any data you want to pass.
*/
(data?: any): void;
}
interface ResolveBalancedFunction {
/
* @param data Any data you want to pass.
* @param sendWorkDone False (default) means this will be sent as normal Promise answer. Set to True to send as work-done Promise answer.
*/
(data?: any, sendWorkDone?: boolean): void;
}
interface RejectFunction {
/
* @param error Error information to send.
*/
(error?: any): void;
}
interface RejectBalancedFunction {
/
* @param error Error information to send.
* @param sendWorkDone False (default) means this will be sent as normal Promise answer. Set to True to send as work-done Promise answer.
*/
(error?: any, sendWorkDone?: boolean): void;
}
interface WorkerMsgClbFunction {
/
* @param worker Current Worker info object.
* @param data Data passed from Worker.
* @param send This is shortcut to worker.send() function for quick answers.
*/
(worker?: WorkerInfo, data?: any, send?: NormalSendFunction): void;
}
interface ManagerMsgClbFunction {
/
* @param data Data passed from Manager.
* @param send This is shortcut to Worker.send() function for quick answers.
*/
(data?: any, send?: NormalSendFunction, sendWorkDone?: NormalSendFunction): void;
}
interface ManagerPromisedMsgClbFunction {
/
* @param data Data passed from Manager.
*/
(data?: any, resolve?: ResolveBalancedFunction, reject?: RejectBalancedFunction): void;
}
/
* Information about Worker.
*/
interface WorkerInfo {
/
* Name given automatically.
*/
name?: string;
/
* Type name given by you.
*/
type?: string;
/
* Options passed by you when forking.
*/
options?: any;
/
* Internal ChildProcess.
*/
process?: ChildProcess;
/
* Your internal data.
*/
data?: any;
/
* Function to send non-Promised message.
*/
send?: NormalSendFunction;
/
* Function to send Promised message.
*/
sendWithPromise?: NormalSendFunction;
}
/
* This is internal structure used for Promises.
*/
interface Promises {
id: number;
resolve?: ResolveFunction;
reject?: RejectFunction;
}
interface TransportNewWorkerFunc {
(name: string, type: string, moduleOrFile: string, options: any, data: any, opts: any, _objectifyDataFunc: any, onMsgFunc: any, _buildFuncSendWithPromiseFunc: any): Promise<WorkerInfo>;
}
interface TransportOneStrReturnFunc {
(): string;
}
interface TransportSendDataFunc {
(proc: any, data: any, _objectifyDataFunc: any): void;
}
interface TransportOnMgrMsgFunc {
(dataClb: any): void;
}
interface Transport {
newWorker: TransportNewWorkerFunc;
getMyRole: TransportOneStrReturnFunc;
sendData: TransportSendDataFunc;
defaultModuleOrFile: TransportOneStrReturnFunc;
sendDataToManager: TransportSendDataFunc;
registerOnMgrMsg: TransportOnMgrMsgFunc;
}
class PromiseCommunicationBase {
protected promiseIdx: number;
protected promises: Promises[];
constructor();
protected _buildFuncSendWithPromise: (sendFunc: any) => NormalSendFunction;
protected _makeResolveFunc(promiseId: number, sendFunc: NormalSendFunction, sendWorkDoneFunc?: NormalSendFunction): (data?: any, sendWorkDone?: boolean) => void;
protected _makeRejectFunc(promiseId: number, sendFunc: NormalSendFunction, sendWorkDoneFunc?: NormalSendFunction): (err?: any, sendWorkDone?: boolean) => void;
protected filterMsgIfPromised(data: any, promisedMsgClb: ManagerPromisedMsgClbFunction, sendFunc: NormalSendFunction, sendWorkDoneFunc?: NormalSendFunction): boolean;
}
class Manager extends PromiseCommunicationBase {
protected onWorkerMessage: WorkerMsgClbFunction;
protected onWorkerPromisedMessage: ManagerPromisedMsgClbFunction;
protected workers: WorkerInfo[];
/
* Class constructor for Manager - CEO and mid-level managers.
* @param onWorkerMessage Callback which will run when non-Promised message arrives to Manager from Worker.
*/
constructor(onWorkerMessage?: WorkerMsgClbFunction, onWorkerPromisedMessage?: ManagerPromisedMsgClbFunction);
protected onMessage(worker: WorkerInfo, data: any): void;
/
* Adds new Worker using child_process.fork() and links it with this Manager. This will return WorkerInfo instance with the possibilities to send messages and with unique name field.
* @param type String with name of type of Worker (for example 'worker' or 'readNode'). MUST BE ONE WORD, ONLY LETTERS.
* @param moduleOrFile Module or file to run (to be used as first parameter in child_process.fork()).
* @param options Options to pass to the Worker - may be anything.
* @param data Data about this Worker to store in this Manager. May by anything.
* @param opts Any options passed to transport.
*/
newWorker(type: string, moduleOrFile?: string, options?: any, data?: any, opts?: any): Promise<WorkerInfo>;
/
* Find WorkerInfo by Worker name.
* @param name Name of Worker.
*/
getWorker(name: string): WorkerInfo;
/
* Find array of WorkerInfo by Worker type.
* @param type Type of Worker.
*/
getWorkers(type: string): WorkerInfo[];
}
class Balancer extends Manager {
private queue;
/
* Class constructor for Balancer Manager - mostly it will be special mid-level manager.
* @param onWorkerMessage Callback which will run when non-Promised message arrives to Manager from Worker.
*/
constructor(onWorkerMessage?: WorkerMsgClbFunction);
private onQueueCheckInterval();
private findMostFreeWorker();
/
* Adds new Worker using child_process.fork() and links it with this Manager. This is special type of Worker which will be managed and balanced by this Balancer. For more information refer to Manager.newWorker() docs.
* @param type String with name of type of Worker (for example 'worker' or 'readNode'). MUST BE ONE WORD, ONLY LETTERS.
* @param moduleOrFile Module or file to run (to be used as first parameter in child_process.fork()).
* @param maxJobsAtOnce Maximum number of jobs that this Worker should do at once.
* @param options Options to pass to the Worker - may be anything.
* @param data Data about this Worker to store in this Manager. May by anything.
* @param forkOpts Any fork options (options : ForkOptions) you may use with child_process.fork().
*/
newBalancedWorker(type: string, maxJobsAtOnce: number, moduleOrFile?: string, options?: any, data?: any, forkOpts?: any): Promise<WorkerInfo>;
/
* Adds job to do by some of the best-suited Worker. Best-suited Worker is the one with the smallest amount of current jobs and with free space for next one. If no Worker can be found the job is queued and when any of the Workers will be free this job will be executed.
* @param data Any data you want to pass to the Worker.
*/
addJob(data?: any): void;
/
* Same as Balancer.addJob() but with Promises.
* @param data Any data you want to pass to the Worker.
*/
addJobWithPromise(data?: any): Promise<any>;
protected onMessage(worker: WorkerInfo, data: any): void;
}
class Worker extends PromiseCommunicationBase {
onManagerMessage: ManagerMsgClbFunction;
onManagerMessageWithPromise: ManagerPromisedMsgClbFunction;
private type;
private name;
private options;
/
* Sends normal, Promised message to closest Manager.
* @param data Any data you want to pass to the Manager.
*/
sendWithPromise: NormalSendFunction;
/
* Class constructor for Worker - it will be any worker including mid-level manager.
* @param onManagerMessage Callback which will run when non-Promised message arrives to Worker from Manager.
* @param onManagerMessageWithPromise Callback which will run when Promised message arrives to Worker from Manager.
*/
constructor(onManagerMessage?: ManagerMsgClbFunction, onManagerMessageWithPromise?: ManagerPromisedMsgClbFunction);
/
* Reads type name of Worker passed by Manager to this Worker while forking it.
*/
getType(): string;
/
* Reads options passed by Manager to this Worker while forking it.
*/
getOptions(): any;
/
* Reads name of Worker passed by Manager to this Worker while forking it.
*/
getName(): string;
private onMessage;
/
* Sends normal, non-Promised message to closest Manager.
* @param data Any data you want to pass to the Manager.
*/
send: (data?: any) => void;
/
* Sends work-done, non-Promised message to closest Manager. This is usually answer for Balancer Manager.
* @param data Any data you want to pass to the Manager.
*/
sendWorkDone: (data?: any) => void;
}
/
* Returns true if this is main process.
*/
function isCEO(): boolean;
/
* Reads type name of Worker passed by Manager to this Worker while forking it or empty string for main CEO process.
*/
function getMyRole(): string;
const useTransportCpFork: () => ChildProcessForkTransport;
const useTransportPm2: (pm2: any) => Pm2Transport;
function setTransport(transp: Transport): void;
}
export = Collab;
Here you may find some pure examples of usage of collab-ms. Examples are ready-to-use, just copy&paste code into index.js file and run it.
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()) {
case '': //Main/CEO
(new collab.Manager((worker, msg) => {
console.log('Msg', msg, 'from worker');
})).newWorker('worker');
break;
case 'worker': //Worker
(new collab.Worker()).send('Hi, ready to work!');
break;
}
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()) {
case '': //Main/CEO
const ceo = new collab.Manager((worker, msg) => {
console.log('Msg', msg, 'from worker', worker.name);
});
for (let i = 0; i < 3; i++) {
ceo.newWorker('worker').then( worker => {
const name = worker.name;
console.log('New worker is named', name);
});
}
break;
case 'worker': //Worker
const worker = new collab.Worker();
worker.send('Hi, ready to work!');
break;
}
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()) {
case '': //Main/CEO
(new collab.Manager()).newWorker('worker').then( worker => {
worker.sendWithPromise('Hi!').then(ans => {
console.log('Answer from worker:', ans);
});
});
break;
case 'worker': //Worker
new collab.Worker(null, (data, resolve, reject) => {
console.log('Request from CEO:', data);
resolve('Oh hey!');
});
break;
}
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()) {
case '': //Main/CEO
const ceo = new collab.Manager();
ceo.newWorker('mathWorker').then( mathWorker => {
mathWorker.sendWithPromise({
a: 2,
b: 3
}).then(ans => {
console.log('Answer from worker', ans);
}).catch(err => {
console.log('Error in worker', err);
});
});
break;
case 'mathWorker': //Worker
const worker = new collab.Worker(null, (data, resolve, reject) => {
resolve( data.a + data.b );
});
break;
}
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()) {
case '': //Main/CEO
(new collab.Manager()).newWorker('worker').then( worker => {
worker.sendWithPromise('Can you work?').then(ans => {
console.log('Answer from worker:', ans);
}).catch(err => {
console.log('Error in worker:', err);
});
});
break;
case 'worker': //Worker
new collab.Worker(null, (data, resolve, reject) => {
console.log('Request from CEO:', data);
if (data.indexOf('work') > -1)
reject('704 Motivation Not Found');
});
break;
}
Both non-Promised and Promissed messages.
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()) {
case '': //Main/CEO
//We can receive "random input"
const ceo = new collab.Manager((worker, data) => {
console.log('Random input: ping from ' + worker.name);
});
ceo.newWorker('mathWorker').then( mathWorker => {
//And still we can send Promised call - when we expect answer
mathWorker.sendWithPromise({
a: 2,
b: 3
}).then(ans => {
console.log('Answer from worker', ans);
}).catch(err => {
console.log('Error from worker', err);
});
});
break;
case 'mathWorker': //Worker
const worker = new collab.Worker(null, (data, resolve, reject) => {
resolve( data.a + data.b );
});
setInterval(() => {
worker.send({}); // Just ping
}, 1000);
break;
}
The idea here is to sum all of the numbers in array multipied by 2. We also create tree structure:
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
switch(collab.getMyRole()) {
case '': //Main/CEO
const ceo = new collab.Manager();
ceo.newWorker('balancer').then( balancer => {
setTimeout(() => {
balancer.sendWithPromise({
data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
}).then(ans => {
console.log('Answer from balancer', ans);
}).catch(err => {
console.log('Error from balancer', err);
});
}, 10000); //Note this is here as fast patch for slow pm2 process spawning
});
break;
case 'balancer': //Banalcer/queue
const balancer = new collab.Balancer();
let promises = [];
for (let i = 0; i < 3; i++) {
promises.push(balancer.newBalancedWorker('mathWorker', 2));
}
Promise.all(promises).then(() => {
//Im also Worker
const worker = new collab.Worker(null, (data, resolve, reject) => {
console.log('Balancer got a job!', data);
let promises = [];
data.data.forEach(number => {
promises.push(balancer.addJobWithPromise({
number
}));
});
Promise.all(promises).then((numbers) => {
const sum = numbers.reduce( ( acc, cur ) => acc + cur.result, 0 );
resolve(sum);
}).catch(reject);
});
});
break;
case 'mathWorker': //Worker
new collab.Worker(null, (data, resolve, reject) => {
setTimeout(() => {
resolve({
result : data.number * 2
}, true);
}, 2000);
});
break;
}
### Big tree structure - a lot of levels This is usually not good idea, but it works. Please note that Worker names should be unique across the app!
This example is slow on pm2 Transport (take a look at pm2 Transport notes).
const collab = require('collab-ms');
collab.setTransport( collab.useTransportCpFork() );
const structureDepth = 10;
switch(collab.getMyRole()) {
case '': //Main/CEO
(() => {
const ceo = new collab.Manager((worker, data) => {
console.log('Input message from', worker.name, 'data is', data);
});
ceo.newWorker('nextLvlWorker', 'index.js', 1);
})();
break;
default:
(() => {
//Im also worker
const meWorker = new collab.Worker();
const myLvl = parseInt(meWorker.getOptions());
const meManager = new collab.Manager((worker, data) => {
console.log('Passing message from level', myLvl+1, 'to level', myLvl);
data.levels.push(myLvl);
meWorker.send(data);
});
console.log('Created nextLvlWorker level', myLvl);
if (myLvl == structureDepth){
meWorker.send({
levels:[structureDepth],
info:'OK!',
});
} else {
meManager.newWorker('nextLvlWorkerlvl' + (myLvl+1), 'index.js', myLvl+1);
}
})();
break;
}
There are a lot of things to do or to add maybe later. The most important is to write tests right now to make it really high-qualiy.
* 1.0.1 Fix of prod pm2 environment under Linux. Fixed sending Promised messages up to Manager. * 1.0.0 pm2 support! Added Transports and some breaking changes :( Updated and improved docs a little bit. * 0.2.4 Fix of Promised messages (this for resolve/reject fixed). * 0.2.3 Fix of Promised messages in Balancer. * 0.2.2 Added possibility to chain Promised messages. Updated reference/docs. * 0.2.1 Small fixes of two-way Promises. * 0.2.0 Added two-way Promises. Both Manager and Worker may now send Promised messages and wait for response. * 0.1.1 First usable public version.
MIT licensed. Created by Jakub Król, IT.focus.
Files |
File | Role | Description | ||
---|---|---|---|---|
build (6 files) | ||||
docs (1 directory) | ||||
src (3 files, 1 directory) | ||||
Gulpfile.js | Aux. | Auxiliary script | ||
LICENSE | Lic. | License text | ||
package.json | Data | Auxiliary data | ||
README.md | Doc. | Documentation |
Files | / | build |
File | Role | Description |
---|---|---|
collab.d.ts | Example | Example script |
collab.js | Class | Class source |
transCp.d.ts | Example | Example script |
transCp.js | Class | Class source |
transPm2.d.ts | Example | Example script |
transPm2.js | Class | Class source |
Files | / | docs | / | images |
File | Role | Description |
---|---|---|
diagram-balancer-2-way.png | Icon | Icon image |
diagram-balancer.png | Icon | Icon image |
diagram-collab-1.png | Icon | Icon image |
diagram-collab-2.png | Icon | Icon image |
Files | / | src |
File | Role | Description | ||
---|---|---|---|---|
typings (3 files) | ||||
collab.ts | Class | Class source | ||
transCp.ts | Class | Class source | ||
transPm2.ts | Class | Class source |
Files | / | src | / | typings |
File | Role | Description |
---|---|---|
bluebird.d.ts | Class | Class source |
node.d.ts | Class | Class source |
vendor.d.ts | Data | Auxiliary data |
Version Control | Unique User Downloads | Download Rankings | |||||||||||||||
100% |
|
|
Applications that use this package |
If you know an application of this package, send a message to the author to add a link here.