async/awaitで配列要素の非同期処理を書く

二年ぶりぐらいにNodeを書いているのだが、寝ている間にコールバック地獄はasync/awaitの導入によって終了していたらしい。ほほーう。

で、配列要素の非同期処理である。コールバック時代はnpmのasyncモジュールを使うのが一般的だったが、async/await時代の今はどうするのか?

逐次処理

普通にループすればいいだけ。ま、そのための構文だからな。async/await

function someAsyncTask(i) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(i ** 2);
    }, i * 100);
  });
}

const arr = [1, 2, 3, 4];

(async () => {
  console.time('series');

  const results = [];
  for (const i of arr) {
    const r = await someAsyncTask(i);
    results.push(r);
  }
  console.log(results);
  console.timeEnd('series');
})();
node series.js
[ 1, 4, 9, 16 ]
series: 1016.411ms

並列処理

配列をmapでpromiseに変換し、Promise.allに渡す。

(async () => {
  console.time('parallel');

  const promises = arr.map((e) => someAsyncTask(e));
  const results = await Promise.all(promises);

  console.log(results);
  console.timeEnd('parallel');
})();
$ node paralle.js
[ 1, 4, 9, 16 ]
parallel: 410.257ms

並列数を指定して逐次処理

ここからが本題。

例えば100万行のjson linesを処理してDBに入れるという処理をnodeで書くことを考える。1行処理するのに50msかかったとすると、逐次処理では14時間もかかってしまう。並列処理では100万のinsertを無制限に投げ続けたらDB側の処理が追いつかずエラーになってしまう。

こういう時は並列数を指定して逐次実行したい。

方法1

逐次処理と並列処理を組み合わせ。sliceで並列数だけ要素を取り出してpromiseの配列にしてPromise.allに渡す。これをループする

function someAsyncTask(i) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(i ** 2);
    }, i * 100);
  });
}

const arr = [1, 4, 2, 1, 2, 10, 5, 1, 2, 2];
const concurrency = 3;

(async () => {
  console.time('throttle');

  const results = []
  for (let i = 0; i < arr.length; i += concurrency) {
    const p = arr.slice(i, i + concurrency).map((e) => someAsyncTask(e));
    const r = await Promise.all(p);
    results.push(r);
  }

  console.log(results);
  console.timeEnd('throttle');
})();
$ node throttle1.js
[ [ 1, 16, 4 ], [ 1, 4, 100 ], [ 25, 1, 4 ], [ 4 ] ]
throttle: 2121.252ms

しかしこれはあまり出来が良くない。数字の合計は30で3並列で処理するのだから1000msぐらいで終わって欲しいのだが、2000ms以上かかっている。Promise.all では時間がかかる処理が1つあると、他の処理がその終了を待つため効率が下がってしまう。

方法2

というわけでよくあるワーカー的な処理に改良。複数のワーカーを作って終わったやつから次のデータを処理させる。

function someAsyncTask(i) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(i ** 2);
    }, i * 100);
  });
}

const arr = [1, 4, 2, 1, 2, 10, 5, 1, 2, 2];
const concurrency = 3;

(async () => {
  console.time('throttle');

  let counter = 0;
  const workers = Array(concurrency).fill().map(async () => {
    const results = [];
    while(counter < arr.length) {
      const e = arr[counter];
      counter += 1;
      const r = await someAsyncTask(e);
      results.push(r);
    }
    return results;
  });
  const results = await Promise.all(workers);

  console.log(results);
  console.timeEnd('throttle');
})();
$ node throttle2.js
[ [ 1, 1, 100 ], [ 16, 25 ], [ 4, 4, 1, 4, 4 ] ]
throttle: 1216.769ms

Node.jsはシングルスレッドなのでワーカーが同一のカウンタ変数や配列を参照するような雑なコードでも正しく動く。ミューテックスセマフォなどで排他制御したり、ロックフリーなキュー実装を使ったりする必要もない。Nodeはこういう時に楽でいい。

エラーハンドリングなど何もしてないので、実際に使うならば同じような処理をするnpmライブラリを探すといいと思います。多分あるでしょ、知らんけど。