kakts-log

programming について調べたことを整理していきます

node.js Promiseを使った非同期処理

node.jsの非同期処理を書く際に便利なPromiseについて紹介します。
主にnode.js 4.x系以降でデフォルト利用可能な機能になります。

Promiseとは

非同期処理を抽象化したオブジェクトで、非同期処理に関する統一的なインターフェースを提供します。
これにより、非同期処理のコールバック関数の書き方が明確になり、可読性があがるというメリットがあります。
ES6(ECMA Script 6)により仕様が定められており、node.js 0.12系以降から徐々にES6の機能が導入されていて。
node.js 4系以降からはデフォルトで使えるようになっています。

http://www.ecma-international.org/ecma-262/6.0/index.html#sec-promise-objects

Node.js ES2015/ES6, ES2016 and ES2017 support

Promiseをどうやって使うか

では、Promiseをつかうってどうやって非同期処理を書くかざっくりとまとめてみると、以下の通りになります。

Promiseクラスのオブジェクトを作成する。
作成したpromiseオブジェクトに対して、成功時と失敗時に行う処理を関数として登録する。

この成功・失敗したときのコールバック関数は、 promise.then()というメソッドで登録することができます。

promiseオブジェクトの作り方

// resolveが呼ばれる
const promise  = new Promise((resolve, reject) {
  const errFlg = false;
  // 何か処理が失敗したときにrejectを呼び出す。
  // このコードではerrFlg = falseなので呼ばれない
  if (errFlg) {
    reject(new Error("An error has occured");
  }
  // 成功時にresolveを呼び出す
  resolve(100);
});

// resolve(成功)が呼ばれた時の処理
const onFulFilled = (value) => {
  console.log("---resolve called--- value: ", value);
};

// reject(失敗)が呼ばれたときの処理
const onRejected = (error) => {
  console.error(error);
};

// .then(成功時の処理, 失敗時の処理)を登録する
promise
  .then(onFulFilled, onRejected);

promiseオブジェクトを使った非同期処理として、成功時にはnew Promiseで渡した関数の第1引数にあるresolve関数、エラー時にはreject関数を呼び出すようにします。

上記の非同期処理の成功・失敗時の処理は、Promise.thenメソッドで指定することができます。

resolve関数呼出し時に引数に値を設定すると、.thenメソッドで指定したコールバック関数で値を受け取ることができます。 そして、処理が失敗した場合は、.thenで指定したコールバック関数は呼ばれずに、.catchで指定したコールバック関数が呼ばれるようになります。
こうすることで、非同期処理の成功時・失敗時の処理をかき分けることができ、可読性があがります。

Promiseオブジェクトの状態について

Promiseオブジェクトには、3つの状態をもっており、ES6 Promisesの仕様で以下のように定められています。

  • Pending: オブジェクトの作成直後の状態
  • FulFIlled: resolveした時
  • Rejected: エラーが発生してrejectした時がRejectedに変化します。

v8/v8.h at 818c6d0ba9e92e3bb0557fabaad886f7cfd9ea8a · v8/v8 · GitHub

node/v8.h at ed12ea371c5352b4b13f9f1c4e0f577fbd30bb2a · nodejs/node · GitHub

下記の図にあるとおり、Promiseの状態は一度FulFilledまたはRejectedに遷移した後、状態が固定され、変化することはありません。

f:id:kakts:20170407003706p:plain
状態が変化したときに、.thenで登録したonFulFilled, onRejectedの関数のどちらかが一度限りだけ呼ばれることになります。
後述しますが、.thenは新たなpromise objectを返すので、メソッドチェーンの形で複数の.thenを登録できます。
.thenを書いた順に処理が走り、毎回promise objectが作成され(Pending)、処理に応じてFulFilled・Rejectedの処理を連続して行うことができます。

厳密ではないですが、.thenのメソッドチェーンを行う際には以下の図ような感じでチェーンのたびに新たにobjectが作られて処理が進んでいきます。 f:id:kakts:20170409150743p:plain

promise.thenを使った成功・失敗時のコールバックの登録

メソッドの仕様は以下のとおりになっています

promise.then(onFulFilled, onRejected)

resolveした時、onFulFilledが呼ばれ、反対にrejectしたときはonRejectedが呼ばれます。 このonFulFilled, onRejectedは必須ではないのでどちらか一方のみ登録することが可能です。 syntax sugarとして、 .catchメソッドも用意されていて、エラー時の処理のみ登録したい場合は.catchを使うと便利です。
実際には.catchは以下の処理と同じです。

promise.then(undefined, onRejected);

node.jsのjavascript エンジンであるv8では、以下の箇所で.catchの実装がされています。
node/builtins-promise.cc at ed12ea371c5352b4b13f9f1c4e0f577fbd30bb2a · nodejs/node · GitHub

TF_BUILTIN(PromiseCatch, PromiseBuiltinsAssembler) {
  // 1. Let promise be the this value.
  Node* const promise = Parameter(0);

  // on_resolveをundefinedにしている。
  Node* const on_resolve = UndefinedConstant();
  Node* const on_reject = Parameter(1);
  Node* const context = Parameter(4);

  Label if_internalthen(this), if_customthen(this, Label::kDeferred);
  GotoIf(TaggedIsSmi(promise), &if_customthen);
  BranchIfFastPath(context, promise, &if_internalthen, &if_customthen);

  Bind(&if_internalthen);
  {
    Node* const result =
        InternalPromiseThen(context, promise, on_resolve, on_reject);
    Return(result);
  }

  Bind(&if_customthen);
  {
    Isolate* isolate = this->isolate();
    Node* const then_str = HeapConstant(isolate->factory()->then_string());
    Callable getproperty_callable = CodeFactory::GetProperty(isolate);
    Node* const then =
        CallStub(getproperty_callable, context, promise, then_str);
    Callable call_callable = CodeFactory::Call(isolate);

    // on_resolveをundefined、on_rejectは.catchの第1引数に登録されたものを使っている
    Node* const result =
        CallJS(call_callable, context, then, promise, on_resolve, on_reject);
    Return(result);
  }
}

promiseオブジェクトの.then .catchメソッドは返り値として新たにpromiseオブジェクトを返すため、
メソッドチェーンをつかって複数の処理を連続して行うことができます。

// new Promiseで promiseオブジェクトを作成する
const promise = new Promise((resolve, reject) => {
  // 処理が正常に終了した場合 resolveを呼ぶ
  resolve(100);
});

// 上記の処理が成功・失敗したときのコールバック処理を登録する
// 成功時の処理 .thenの第1引数に渡す
// 失敗時の処理 .catchの第1引数に渡す
promise
  .then((value) => {
    console.log("----then1 value", value);

    // return した値valueがpromiseオブジェクト以外の場合暗黙的にresolve(value)が呼ばれ、
    // 次のthenメソッドが呼ばれる
    // これをコメントアウトすると次のthenにはvalue=undefinedが渡る 
    // 関数でreturnを書かない場合 暗黙的にundefinedが返されるため
    return 200;
  })
  .then((value) => {
    console.log("----then2 value", value);
  })
  .catch((error) => {
    console.error(error);
  });

そして .thenをメソッドチェーンで複数続けて書く場合、1番目のthenの中でエラーが発生してrejectが呼ばれた場合、2番目のthenは呼ばれずにcatchで指定されたコールバックが実行されます。
下記に例をしめしているのでコードを読むとわかりやすいです。

const taskA = () => {
  console.log("Task A");
  // throwすることでreject(new Error())を呼び出すと同義になる
  throw new Error("throw Error @ Task A");
};

const taskB = () => {
  console.log("Task B"); // 呼ばれない
};

const onRejected = (error) => {
  console.error(error); // => "throw Error @ Task A"
}

const finalTask = () => {
  console.log("Final Task");
}

const promise = Promise.resolve();
promise
  .then(taskA) // ここでエラーが投げられる
  .then(taskB) // 呼ばれない
  .catch(onRejected) // エラー処理
  .then(finalTask); // 呼ばれない

このコードで注意する必要があるのは、
メソッドチェーンの途中で.catchの後ろに.thenをつなげているのですが、
最後の.thenのコールバック関数でエラーが起きたときに、これより後ろに.catchがつながっていないためエラーを拾うことができないので注意が必要です。

複数のPromise objectに対する処理

Promiseには、複数のpromise objectに対して処理を行うインターフェースも用意されています。 promise.all と promise.raceの2つがあり、ここで紹介していきます。

Promise.all

Promise.allは 複数のpromiseオブジェクトの配列を引数に取るもので、 配列中のpromise objectを並列で実行し、すべてのpromise objectがresolveされた時、初めて.thenが呼ばれます。 .thenには 配列中の各オブジェクトでresolveに渡された値が配列で渡ってくるので、それを利用することができます。

const aPromise = new Promise((resolve, reject) => {
  resolve(4);
});

const bPromise = new Promise((resolve, reject) => {
  resolve(5);
});

// .allはpromiseオブジェクトの配列を引数に取る
// 引数に渡したpromiseオブジェクトのすべてがresolveされたときに次の.then を呼ぶ
Promise
  .all([aPromise, bPromise])
  .then((value) => {
    console.log('resolved')
    console.log(value); // [4, 5]を返す
  })
  .catch((error) => {
    console.error('error');
    console.error(error);
  });

v8では以下の箇所で実装されています。
node/promise.js at master · nodejs/node · GitHub

function PromiseAll(iterable) {
  if (!IS_RECEIVER(this)) {
    throw %make_type_error(kCalledOnNonObject, "Promise.all");
  }

  // false debugEvent so that forwarding the rejection through all does not
  // trigger redundant ExceptionEvents
  var deferred = %new_promise_capability(this, false);
  var resolutions = new InternalArray();

  var count;

  // For catch prediction, don't treat the .then calls as handling it;
  // instead, recurse outwards.
  var instrumenting = DEBUG_IS_ACTIVE;
  if (instrumenting) {
    SET_PRIVATE(deferred.reject, promiseForwardingHandlerSymbol, true);
  }

  function CreateResolveElementFunction(index, values, promiseCapability) {
    var alreadyCalled = false;
    return (x) => {
      if (alreadyCalled === true) return;
      alreadyCalled = true;
      values[index] = x;
      // すべてのpromise objectが成功したのでresolve発火
      if (--count === 0) {
        var valuesArray = [];
        %MoveArrayContents(values, valuesArray);
        %_Call(promiseCapability.resolve, UNDEFINED, valuesArray);
      }
    };
  }

  try {
    var i = 0;
    // resolveに必要な残りpromise object数
    // 0になったらresolve発火
    count = 1;
    for (var value of iterable) {
      var nextPromise = this.resolve(value);
      ++count;
      var throwawayPromise = nextPromise.then(
          CreateResolveElementFunction(i, resolutions, deferred),
          deferred.reject);
      // For catch prediction, mark that rejections here are semantically
      // handled by the combined Promise.
      if (instrumenting && %is_promise(throwawayPromise)) {
        SET_PRIVATE(throwawayPromise, promiseHandledBySymbol, deferred.promise);
      }
      ++i;
    }

    // 6.d
    // すべてのpromise objectが成功したのでresolve発火
    if (--count === 0) {
      var valuesArray = [];
      %MoveArrayContents(resolutions, valuesArray);
      %_Call(deferred.resolve, UNDEFINED, valuesArray);
    }

  } catch (e) {
    // エラーになったら即座にcatchされ、rejectされる
    %_Call(deferred.reject, UNDEFINED, e);
  }
  return deferred.promise;
}

Promise.race

Promise.allと同様に、複数のpromiseオブジェクトを引数にとるのがPromise.raceです。
.raceは、.allと違って、登録した複数のpromiseオブジェクトのうち、一番早く処理が終わったもののresolveの結果を.thenに渡します。

// promise.replace
// promiseオブジェクトの配列を引数に渡す
// 引数に渡したオブジェクトの中でどれか一つでもFulFilledまたRejectedになったら次の処理を実行する

const time400 = new Promise((resolve, reject) => {
  console.log("time400");
  setTimeout(() => {
    resolve(400);
  }, 400);
});

const time100 = new Promise((resolve, reject) => {
  console.log("time100");
  setTimeout(() => {
    resolve(100);
  }, 100);
});

const time4000 = new Promise((resolve, reject) => {
  console.log("time4000");
  setTimeout(() => {
    resolve(4000);
  }, 4000);
});

const time700 = new Promise((resolve, reject) => {
  console.log("time700");
  setTimeout(() => {
    resolve(700);
  }, 700);
});

Promise
  .race([time400, time100, time4000, time700])
  .then((value) => {
    // time100が一番早くresolveが呼ばれるので 100が渡ってくる
    console.log("then value:", value);
  })
  .catch((error) => {
    console.error("error");
    console.error(error);
  });

ポイントとしては、引数に渡したすべてのpromiseオブジェクトの処理は実行されるが、一番早くresolveが呼ばれたものに対して.thenへ値が渡されることです。
上記のコードの実行結果として、各promiseオブジェクトのconsole.logが出力されているのが確認できると思います。

promise.raceのv8での実装は以下の通りになります。
https://github.com/nodejs/node/blob/master/deps/v8/src/js/promise.js#L100-L130

function PromiseRace(iterable) {
  if (!IS_RECEIVER(this)) {
    throw %make_type_error(kCalledOnNonObject, PromiseRace);
  }

  // false debugEvent so that forwarding the rejection through race does not
  // trigger redundant ExceptionEvents
  var deferred = %new_promise_capability(this, false);

  // For catch prediction, don't treat the .then calls as handling it;
  // instead, recurse outwards.
  var instrumenting = DEBUG_IS_ACTIVE;
  if (instrumenting) {
    SET_PRIVATE(deferred.reject, promiseForwardingHandlerSymbol, true);
  }

  try {
    for (var value of iterable) {
      // 配列に渡したpromise objectをループ
      // この時点で実際にpromiseの処理を一つずつ行っている。
      // 一番早くresolveされたobjectをすぐに.thenに送る
      var throwawayPromise = this.resolve(value).then(deferred.resolve,
                                                      deferred.reject);
      // For catch prediction, mark that rejections here are semantically
      // handled by the combined Promise.
      if (instrumenting && %is_promise(throwawayPromise)) {
        SET_PRIVATE(throwawayPromise, promiseHandledBySymbol, deferred.promise);
      }
    }
  } catch (e) {
    // 1つでもエラーをcatchしたらrejectする
    %_Call(deferred.reject, UNDEFINED, e);
  }
  return deferred.promise;
}

まとめ

node.js(v8)で実装されているPromiseオブジェクトの概要について解説しました。
非同期処理は、async.jsなどをつかって実装することもできるのですが、Promiseを使うと、成功時・失敗時の処理を分離してかけるので、さらに可読性が上がって良いと思いました。

node.js v7.6系以降でデフォルトで使用可能になるasync/await は、このPromiseをベースにしてつくられたラッパーなので、async/awaitを学ぶ際にはPromiseの仕組みを知っていると理解が深まると思います。
node.jsのasync/awaitに関しては別の記事で取り上げたいと思います。

参考

Promises in Node.js - An Alternative to Callbacks - StrongLoop

azu.github.io