Redux-SagaでのWebSocket(Socket.IO)イベント処理

ReactRedux生態系のソケットのような外部イベント処理は、開発に多くの悩みを抱かせます。同じ外部イベントが発生してもアプリの状態によって、異なる方法で処理したり、無視したり、アプリの状態に関係なく常に処理したいこともあります。特に外部イベントがReduxのアクションと接続する場合、Reduxのミドルウェアを通じてアクションがDispatchになったことを把握できる必要性があります。そのため、単純なReduxストアのAPIだけでは、すべてのサービスロジックを処理するのは非常に難しいでしょう。

外部イベント処理と関連したもので、最もよく見られるケースがソケットとの連結です。Redux生態系では、ソケットとReduxがよくマッチするように数多くのライブラリがありますが、ここでは、Redux-Saga(以下Redux-Sagaまたはsaga)を活用したソケットイベント処理について調べます。

あらかじめ知っておくべきこと

  • Redux -Redux-SagaはReduxのミドルウェアです。以前に作成したRedux分析文も時間があれば参考にしよう。
  • Redux-Saga -この記事ではSagaのAPIを活用するため、Sagaの基本的な理解が必要です。

ソケットイベントチャネルを作成する

Redux-Sagaでは、基本的にチャネル、アクションチャネル、イベントチャネルというAPIを提供します。ソケットイベントを連結するには、イベントチャネルAPIを活用できます。

まず、簡単にソケットイベントチャネルを作るファクトリ関数を作成してみよう。

// createSocketChannel.js

import {eventChannel, buffers} from 'redux-saga';
import socket from '../../../mySocket';

const defaultMatcher = () => true;

export function createSocketChannel(eventType, buffer, matcher) {
  return eventChannel(emit => {
    const emitter = message => emit(message);

    socket.on(eventType, emitter);
    return function unsubscribe() {
      socket.off(eventType, emitter);
    }
  }, buffer || buffers.none(), matcher || defaultMatcher);
}

export function closeChannel(channel) {
  if (channel) {
    channel.close();
  }
}

イベントチャネルAPIを簡単にwrappingするだけでソケットチャネルファクトリを作成できます。そしてイベントチャネルを作成するときは必ず、unsubscribe関数を返す必要がある点を忘れないこと。

buffermatcherは続編で詳しく説明するので、ひとまず無視しよう。

ソケットチャネルメッセージの受信

ソケットチャネルファクトリを作成したので、ソケットメッセージを受信してみよう。次の簡単なコードでソケットメッセージを受信できます。

import {fork, take, call} from 'redux-saga/effects';
import {createSocketChannel} from './createSocketChannel';

function* onMessage(type) {
  const channel = yield call(createSocketChannel, type);

  while (true) {
    try {
      const message = yield take(channel);

      console.log(message);
    } catch (e) {
      alert(e.message);
    }
  }
}

export default function* rootSaga() {
  //...
  // "foo"と"bar"というメッセージ受信
  yield fork(onMessage, 'foo');
  yield fork(onMessage, 'bar');
}

Redux-SagaはReduxのミドルウェアであるため、ソケットチャネルからメッセージを受信して、アクションを再度Dispatchするか、他のアクションを待機、エラー処理、あるいは別の非同期処理など、多くの作業を簡単に処理できます。

ソケットチャネルを活用する – 特定アクションのメッセージを受信

ソケットチャネルを通じてメッセージを受信する方法まで調べました。WAIT_FOO_MESSAGE_ONCEというアクションが発生すると、ソケットから“foo”タイプのメッセージを一度だけ受信して処理すると考えてみよう。

import {takeEvery, take, call} from 'redux-saga/effects';
import {createSocketChannel, closeChannel} from './createSocketChannel';
import {WAIT_FOO_MESSAGE_ONCE} from '../../../action/message';

function* waitFooOnce() {
  let channel;

  try {
    channel = yield call(createSocketChannel, 'foo');

    const message = yield take(channel);

    console.log(message);
  } catch (e) {
    alert(e.message);
  } finally {
    closeChannel(channel);
  }
};

export default function* rootSaga() {
  //...
  yield takeEvery(WAIT_FOO_MESSAGE_ONCE, waitFooOnce);
}

takeEveryを通じてWAIT_FOO_MESSAGE_ONCEというアクションにwaitFooOnceというWorkerを登録しました。これでWAIT_FOO_MESSAGE_ONCEアクションが発生するたびに“foo”メッセージを待って処理できます。

もしメッセージ待機作業が終わっていない状態で再び同じアクションが発生した場合、すべて無視して一度だけメッセージを受信したいときは、takeEveryの代わりにtakeLatestを活用できます。

ソケットチャネルを活用する – タイムアウト

ソケットチャネルについて理解したら、timeoutのロジックも非常に簡単に実装できます。10秒間メッセージを待って、メッセージがなかったら、alertを表示してソケットメッセージの待機作業を終了させてみよう。

waitFooOnceを次のように変更できます。

import {delay} from 'redux-saga';
import {takeEvery, take, race, call} from 'redux-saga/effects';
import {createSocketChannel, closeChannel} from './createSocketChannel';
import {WAIT_FOO_MESSAGE_ONCE} from '../../../action/message';

function* waitFooOnce() {
  let channel;

  try {
    channel = yield call(createSocketChannel, 'foo');

    const {timeout, message} = yield race({
      timeout: delay(10000);
      message: take(channel);
    });

    if (timeout) {
      alert('timeout!!');
    } else {
      console.log(message);
    }
  } catch (e) {
    alert(e.message);
  } finally {
    closeChannel(channel);
  }
};

export default function* rootSaga() {
  //...
  yield takeEvery(WAIT_FOO_MESSAGE_ONCE, waitFooOnce);
}

raceeffectを用いてタイムアウトを簡単に実装できました。Redux-Sagaを使わずに、特定アクションが発生したとき、どのソケットメッセージをタイムアウト設定と共に待つように実装できるか考えてみよう。想像するだけで頭が痛いですね。

バッファを活用する

特定アクションが発生し、そのアクションに見合う要求をAjaxで処理した後、ソケットメッセージを待つと考えてみよう。

アクションDispatch => ajax処理 => ソケットメッセージ待機

問題は、Ajax要求とソケットメッセージは両方とも非同期なので順序が狂う可能性があります。もう少し細分化すると、次のような2つの流れが生じることがあります。

  1. アクションDispatch => Ajax要求 => Ajax応答 => ソケットメッセージ
  2. アクションDispatch => Ajax要求 => ソケットメッセージ => Ajax応答

もしAjaxの応答とソケットメッセージが別問題なら構いませんが、ソケットメッセージが先に呼び出されたAjaxの応答結果と関連するメッセージであれば問題が発生します。このような場合に備えてバッファを利用できます。

次のコードは、問題が発生する可能性があります。

channel = yield call(createSocketChannel, 'foo');

const result = yield call(api.requestFoo)

//... resultによる処理

const {timeout, message} = race({
  timeout: delay(10000),
  message: take(channel);
});

//.. messageによってどのように処理

Ajax応答が届く前にソケットメッセージが先に届けば、上記の操作は最終的にメッセージタイムアウトが発生します。ソケットサーバーの立場では、明らかにメッセージを送ったにもかかわらず、メッセージが到着していないように見えるため、無念かもしれません。

バッファを活用しよう。ファクトリを作成するときにバッファを伝達するようにした点を覚えておこう。イベントチャネルのバッファは、buffersというAPIを通じて行うことができます。

channel = yield call(createSocketChannel, 'foo', buffers.sliding(1));

const result = yield call(api.requestFoo);

//... resultによる処理

const {timeout, message} = race({
  timeout: delay(10000),
  message: take(channel);
});

//.. messageによってどのように処理

バッファのサイズを1に設定し、overflowが発生すれば以前のメッセージを捨てるようにslidingAPI(この他にもfixedexpandingdroppingがある)を使用しました。Ajaxの応答が到着する前にメッセージが先に到着しても、バッファに保存され、後にtakeにインポートできるので、メッセージを逃す心配をしなくてもよくなります。

Matcherを活用する

Matcherの使用はバッファよりはるかに簡単です。イベントを受信すると、実際にチャネルに送られるイベントのみろ過するフィルターの役割をします。

メッセージには要求者のIDがあり、アプリケーションは、メッセージの要求者が自分であるときだけ他の処理をすると考えてみよう。次のようにMatcherを作成できます。

const matcher = message => message.headers.requester === MY_ID;

function* onMyMessage() {
  const channel = yield call(createSocketChannel, 'foo', null, matcher);

  while (true) {
    try {
      // ここで受信したメッセージは、すべて自分が要求したメッセージである。
      const message = yield take(channel);

      // ... メッセージ処理
    } catch (e) {
      alert(e.message);
    }
  }
}

// ...

ソケットチャネルでMatcherを通じてフィルタリングを行うことができるため、ソケットメッセージのリスナーから別途の分岐処理ロジックを除去できます。つまりソケットチャネルを1つだけ作成し、リスナーの内部で各メッセージに応じた分岐処理をするよりも、Mathcerを活用して複数のチャネルでそれぞれのサービスロジックを処理するようにしよう。

高速受信されたメッセージに対するバッチ

ReduxとSagaを通じてソケットメッセージに対する処理を簡単に行うことができますが、それでも問題は引き続き発生します。特にソケットメッセージを高速受信し、そのメッセージに応じたアクションをdispatchする場合、レンダリングにボトルネックが発生します。(関連イシュー : “Huge performance issue when dispatching hundreds of actions” 

簡単にアプリをReactでレンダリングする、次のような場合を考えてみよう。

  1. リストにアイテムが1000個ある
  2. ユーザーがリストアイテム1000個の削除要請をする
  3. サーバーはアイテム削除を非同期で処理する
  4. 各アイテム削除に対するソケットメッセージを受信する
  5. アイテム削除メッセージを受信するたびに、リストアイテムの削除アクションをDispatchする
  6. Reactがアイテム削除されたリストのレンダリングを完了する前に、次のアイテムの削除メッセージが到着する
  7. この作業は、約1000回繰り返される

この場合、パフォーマンス低下が避けられません。簡単にもう一度考えてみよう。

  1. 最初のアイテム削除メッセージが到着した
    ->以前の状態リストと次の状態リストを比較する
    –>リストアイテム要素1000個を比較する
    —>削除されたアイテムを外すDOMの更新を実行する
  2. 2番目のアイテム削除メッセージが到着した
    ->以前の状態リストと次の状態リストを比較する
    –>リストアイテム要素999個を比較する
    —>削除されたアイテムを外すDOMの更新を実行する
  3.  

アイテム1000個を削除すると、DOMのレンダリング1000回が無条件で要求されます。そして、Reactリストコンポーネントのみ各メッセージに基づいて各アイテムの要素を比較するReconciliationもあります。もちろんReactやブラウザが内部的に何らかの一括処理を実行することもできますが、オペレーション自体が1000回のDOMレンダリングと約50,000回以上のReconciliationを要求することは変わりません(+ReduxのすべてのChangeリスナーも1000回ずつ実行される) 。最終的に1000個のメッセージが2〜3秒以内にすべて到着するとパフォーマンスの問題が発生します。

次のコードを見てみよう。普段は特に問題ありませんが、多くのタスクを処理する際に大きな問題となります。

// Action creator
export function deleteItems(...ids) {
  return {
    type: DELETE_ITEMS,
    ids
  };
}
// Saga
const channel = yield call(createSocketChannel, 'deleteItem');

while (true) {
  const message = yield take(channel);

  yield put(deleteItems(message.itemId));
}

パフォーマンスの問題を解決するためにまず考える方法が、ThrottleやDebounceがこのような削除作業には適していないということです。いずれにせよ結果的には1000個のアイテム削除を処理してユーザーに表示すべきところ、ThrottleやDebounceは意図的にメッセージを捨てる作業であるため、その結果を正確に表示できません。

したがって、このような場合には約100〜200ms(アプリごとに待機時間は異なるので、適正時間は必ず測定すること)程度はメッセージを集めて、一度にバッチ処理する方法が必要です。

バッファを活用して、メッセージを集めてみよう。

// Saga

const channel = yield call(createSocketChannel, 
  'deleteItem',
   buffers.expanding(50) // 任意で50を指定した。各アプリに合わせて設定する。
);

while (true) {
  const messages = yield flush(channel);
  const ids = messages.map(message => message.itemId);

  if (ids.length) {
    yield put(deleteItems(...ids));
  }

  yield delay(200); // 任意に指定した待機時間。各アプリに合わせて設定する。
}

このようにバッファを活用すれば、約200msのメッセージを集めてdeleteItemsのバッチ処理ができます。また、ThrottleやDebounceとは異なり、メッセージを意図的に捨てることはありません。

そしてもう1つ重要な点は、if (ids.length)構文です。もしこの構文がないと、メッセージを受信できなくても200msごとに常にdispatchのタスクを実行するため、開発者に多大な混乱を与えかねません。

おわりに

Reduxを使用するアプリケーションでソケットのような外部イベントを一緒に処理するのはなかなか難しいでしょう。前述したRedux生態系のライブラリやRedux-Sagaを活用すると、もう少し簡単に外部イベントを処理できます。

今回、Redux-Sagaと内部のイベントチャネル、バッファ、Matcher APIを活用して、ソケットイベントを簡単に処理することができました。しかしサンプルや説明が思ったより多くはなく、最初は多くの困難を強いられました。(Redux-Sagaの公式文書は非常によく書かれていますが、イベントチャネルについては多くありません)。この記事でRedux-Sagaでのソケットイベント処理が少しでも役立つことを願っています。

TOAST Meetup 編集部

TOASTの技術ナレッジやお得なイベント情報を発信していきます
pagetop