二年ぶりぐらいにNodeを書いているのだが、寝ている間にコールバック地獄はasync/awaitの導入によって終了していたらしい。ほほーう。
で、配列要素の非同期処理である。コールバック時代はnpmのasyncモジュールを使うのが一般的だったが、async/await時代の今はどうするのか?
逐次処理
普通にループすればいいだけ。ま、そのための構文だからな。async/await
function someAsyncTask(i) { return new Promise((resolve, reject) => { setTimeout(() => { resolve(i ** 2); }, i * 100); }); } const arr = [1, 2, 3, 4]; (async () => { console.time('series'); const results = []; for (const i of arr) { const r = await someAsyncTask(i); results.push(r); } console.log(results); console.timeEnd('series'); })();
node series.js [ 1, 4, 9, 16 ] series: 1016.411ms
並列処理
配列をmapでpromiseに変換し、Promise.allに渡す。
(async () => { console.time('parallel'); const promises = arr.map((e) => someAsyncTask(e)); const results = await Promise.all(promises); console.log(results); console.timeEnd('parallel'); })();
$ node paralle.js [ 1, 4, 9, 16 ] parallel: 410.257ms
並列数を指定して逐次処理
ここからが本題。
例えば100万行のjson linesを処理してDBに入れるという処理をnodeで書くことを考える。1行処理するのに50msかかったとすると、逐次処理では14時間もかかってしまう。並列処理では100万のinsertを無制限に投げ続けたらDB側の処理が追いつかずエラーになってしまう。
こういう時は並列数を指定して逐次実行したい。
方法1
逐次処理と並列処理を組み合わせ。sliceで並列数だけ要素を取り出してpromiseの配列にしてPromise.allに渡す。これをループする
function someAsyncTask(i) { return new Promise((resolve, reject) => { setTimeout(() => { resolve(i ** 2); }, i * 100); }); } const arr = [1, 4, 2, 1, 2, 10, 5, 1, 2, 2]; const concurrency = 3; (async () => { console.time('throttle'); const results = [] for (let i = 0; i < arr.length; i += concurrency) { const p = arr.slice(i, i + concurrency).map((e) => someAsyncTask(e)); const r = await Promise.all(p); results.push(r); } console.log(results); console.timeEnd('throttle'); })();
$ node throttle1.js [ [ 1, 16, 4 ], [ 1, 4, 100 ], [ 25, 1, 4 ], [ 4 ] ] throttle: 2121.252ms
しかしこれはあまり出来が良くない。数字の合計は30で3並列で処理するのだから1000msぐらいで終わって欲しいのだが、2000ms以上かかっている。Promise.all では時間がかかる処理が1つあると、他の処理がその終了を待つため効率が下がってしまう。
方法2
というわけでよくあるワーカー的な処理に改良。複数のワーカーを作って終わったやつから次のデータを処理させる。
function someAsyncTask(i) { return new Promise((resolve, reject) => { setTimeout(() => { resolve(i ** 2); }, i * 100); }); } const arr = [1, 4, 2, 1, 2, 10, 5, 1, 2, 2]; const concurrency = 3; (async () => { console.time('throttle'); let counter = 0; const workers = Array(concurrency).fill().map(async () => { const results = []; while(counter < arr.length) { const e = arr[counter]; counter += 1; const r = await someAsyncTask(e); results.push(r); } return results; }); const results = await Promise.all(workers); console.log(results); console.timeEnd('throttle'); })();
$ node throttle2.js [ [ 1, 1, 100 ], [ 16, 25 ], [ 4, 4, 1, 4, 4 ] ] throttle: 1216.769ms
Node.jsはシングルスレッドなのでワーカーが同一のカウンタ変数や配列を参照するような雑なコードでも正しく動く。ミューテックスやセマフォなどで排他制御したり、ロックフリーなキュー実装を使ったりする必要もない。Nodeはこういう時に楽でいい。
エラーハンドリングなど何もしてないので、実際に使うならば同じような処理をするnpmライブラリを探すといいと思います。多分あるでしょ、知らんけど。