【Node.js】 並列処理をおこなう三つのモジュールの概要

更新日:2024/02/27

Node.jsではworker_threads、child_process、clusterの3つのモジュールが用意されていて、それぞれ異なる方式で並列処理を実現できます。
しかし3つもあると、それを使ったらいいか迷ってしまいます。
そこで各モジュールの概要と、簡単なコードをお伝えします。

 

worker_threadsでの並列処理

worker_threadsはサブとなるスレッド(ワーカースレッド)を作成して、交互に通信を行うことができます。
これはブラウザのWebWorkerAPIと似た機能です。

時間がかかる処理を並列化したいならworker_threadsを使用します。

後述の二つのモジュールが作成するのはプロセスですが、worker_threadsはスレッドです。
スレッドはプロセス内に複数作成でき、プロセス内のスレッドでデータを共有できます。
しかしプロセス間では、データの共有ができません。

プロセス内のスレッドでデータを共有できる。プロセス間では、データの共有ができない

そのため、大きなデータの受け渡しをおこないたいときはworker_threadsが効率的です。

メインスレッド


const {Worker} = require("worker_threads");

    // ①ワーカーの作成
const worker = new Worker( "./sub_worker.js",{workerData:{dataId:2}});

    // ②ワーカーからの受信設定
worker.on("message", msg => {
    console.log(`受信:${ msg }`); // 受信:返信:buff1=102 , buff2=202
});

const u8_1 = new Uint8Array( [100,101,102,103,104] );
const u8_2 = new Uint8Array( [200,201,202,203,204] );

    // ③ワーカーへのメッセージ送信
worker.postMessage({buff1:u8_1,buff2:u8_2},[u8_1.buffer]);
console.log( u8_1.length , u8_2.length); // 0 , 5

    // ④ワーカーを終了させる
setTimeout(()=>worker.terminate(),5000);

ワーカースレッド


const {parentPort,workerData} = require("worker_threads");

const dataId = workerData.dataId;

parentPort.on("message", msg => {
    parentPort.postMessage(`返信:buff1=${ msg.buff1[dataId]} , buff2=${ msg.buff2[dataId]}`);
});

簡単に処理を解説してみます。

①new Worker( )

子スレッドを起動します。
起動したスレッドはワーカースレッドと呼ばれます。

1番目の引数は、子スレッドで実行するスクリプトファイルへのパスです。
2番目の引数は、様々な設定がおこなえるオプション値です。これは省略できます。
コードでは、子スレッドで受け取り可能なworkerDataを設定しています。
workerDataの受け取り方法は、ワーカースレッドのコードを見てください。

②worker.on("message",msg=>{} )

子スレッドからのメッセージ受信を設定しています。

WebWorkerAPIのリスナー引数はMessageEventオブジェクトで、dataプロパティに子スレッドからのデータがセットされていますが、worker_threadsは子スレッドからのデータを直接受け取ります。

③worker.postMessage( )

ワーカースレッドにデータを送信します。
これはWebWorkerAPIと同じです。

詳しくはWebWorkerAPIについて書いた記事の、次の項目を読んでみてください。
スレッド間のデータの転送について

④worker.terminate()

ワーカースレッドが動作中は、メインスレッドは終了しません。
ワーカースレッドを終了させると、メインスレッドは終了します。

※他のイベントを待っているときは、終了しません。

 

child_processによる並列処理

child_processは、子プロセスを作成します。
子プロセスは独立したメモリ領域を持っているため、データの共有はできません。

プロセス間通信で受け渡されるデータは、JSONまたはNode.jsに組み込まれているV8モジュールのSerialization APIでシリアライズされます。Serialization APIはJSONでサポートされていない、BigInt、MapとSet、ArrayBufferとTypedArray、Buffer、Error、RegExpなどをシリアライズできます。

プロセスはスレッドと比較して作成するためのメモリ量が大きかったり、動作するまでの時間が長いなどのデメリットがあります。

親プロセス


const child_process = require("child_process");

    // ①子プロセスの作成
const child1 = child_process.fork( "./child", ["こんにちは"] , {env:{id:1}});
const child2 = child_process.fork( "./child", ["こんばんは"] , {env:{id:2}});

   // ②子プロセスからのメッセージ受け取り設定
[child1,child2].forEach(e=>{
    e.on("message", msg => console.log(msg));
});
   // ③2秒後に子プロセスにメッセージ送信
setTimeout( ()=>{
    [child1,child2].forEach(e=>e.send({time:Date.now()}));
},2000);
   // ④5秒後に子プロセス停止
setTimeout( ()=>{
    [child1,child2].forEach(e=>e.disconnect());
},5000);

子プロセス:child.js


const [appName,jsName,arg] = process.argv;
const id = process.env.id;

process.on("message",  msg => {
    process.send( `${jsName}:${arg}(${id}):${msg.time}`);
});

簡単に処理を解説してみます。

①child_process.fork

子プロセスを作成しjsファイルを実行します。

2番目以降の引数は省略可能です。
2番目の引数は子プロセスにargvとして渡す値です。
3番目はオプションで、ここではenvを渡しています。

受け取り方法は、子プロセスのコードを見てください。

オプションのserializationプロパティで、データのシリアライズ方法を指定できます。

JSONを使用:{ serialization: 'json' }
Serialization APIを使用:{ serialization: 'advanced' }

②child_process.on( "message" , msg=>{} )

子プロセスからのメッセージを処理するリスナーを登録しています。

③child_process.send( )

子プロセスにメッセージを送信しています。

④child_process.disconnect( )

子プロセスを全て終了して、親プロセスを終了できるようにしています。

 

clusterによる並列処理

clusterモジュールは自プロセスを子プロセスとして複製し、コンピューターに搭載されている複数のCPUコアで実行できるように割り振り、コンピューター資源を無駄なく使うことを目的としています。

子プロセス親ロセスの複製なので、同じプログラムコードを処理します。
そのため一つのスクリプトファイルで、親が子を管理するようにプログラムを組む必要があります。


const  cluster = require("cluster");

if(cluster.isPrimary) { // 親プロセス判定

    const cpuNum = require('os').cpus().length; // CPUコアの数

       // ①子プロセスの起動 ②メッセージの受信設定
    for (let i = 0; i < cpuNum; i++) {
        cluster.fork({ msg: `ID${i}` })
           .on("message", msg => console.log(msg));
    }

       // 5秒後に子プロセスを停止
    setTimeout( ()=>{
        cluster.disconnect();
    },5000);

}else{ // 子プロセスでの処理
    const msg = process.env.msg;
        // ③親スプセスにメッセージを送信
    process.send( `ワーカーからのメッセージ(${msg})` );
}

①cluster.fork( )

子プロセスの起動は、child_processと同じようにforkメソッドでおこないます。
ただし引数が異なります。
cluster.forkは子プロセスのenvの値を指定します。
この引数は省略可能です。

cluster.forの戻り値は、作成したプロセスを表すWorkerオブジェクトです。

②Worker.on( "message" , ()=>{} )

子プロセスからのメッセージを処理するリスナーを登録しています。

③process.send( )

親プロセスにメッセージを送信しています。

clusterはポートからの入力を子プロセスに割り振る機能を持っています。
そのため、clusterはWebサーバー構築の手段として用いられることが多いです。

次のコードは、上のコードの子プロセス処理部分を変更したものです。

簡易的なWebサーバー


}else{ // 子プロセスでの処理
    require("http").createServer((req, res) => {
        res.writeHead(200,{'Content-Type': 'text/html; charset=utf-8'});
        res.end(`<p>${msg}</p>`);
       process.send( msg );
    }).listen(44001);

    const msg = `リクエスト受付(${process.env.msg})`;
}

上のコードを実行後にブラウザのアドレスバーに http://localhost:44001/と入力すると、親プロセスが子プロセスに処理を割り振り、変数msgの値がブラウザに表示されます。

更新日:2024/02/27

書いた人(管理人):けーちゃん

スポンサーリンク

記事の内容について

null

こんにちはけーちゃんです。
説明するのって難しいですね。

「なんか言ってることおかしくない?」
たぶん、こんなご意見あると思います。

裏付けを取りながら記事を作成していますが、僕の勘違いだったり、そもそも情報源の内容が間違えていたりで、正確でないことが多いと思います。
そんなときは、ご意見もらえたら嬉しいです。

掲載コードについては事前に動作確認をしていますが、貼り付け後に体裁を整えるなどをした結果動作しないものになっていることがあります。
生暖かい視線でスルーするか、ご指摘ください。

ご意見、ご指摘はこちら。
https://note.affi-sapo-sv.com/info.php

 

このサイトは、リンクフリーです。大歓迎です。