2013年5月24日金曜日

JBossDataGridのリスナーのコールバック関数を呼び出す

JBossテクニカルチームの寺田です。

今回はJBossDataGirdのリスナーを使ってデータやキャッシュの状態が変更された際のイベントを拾ってみたいと思います。
JBossDataGridの中でキャッシュおよびキャッシュマネージャの状態やデータの状態が変わった際にイベントが発生するのですが、 このリスナー機能を使うにはイベントを処理するためのコールバック関数を用意しておき、あらかじめキャッシュおよびキャッシュマネージャに登録しておきます。
イベント発生時に、登録されたコールバック関数が呼び出されるという仕組みとなっています。
また、リスナー機能はRDB製品によく実装されているトリガーと似た仕組みとなっており、変更前と後の値を取得することができ、ロジックを追加できます。

こういったリスナー機能を調べるために、infinispan-quickstartのclusterd-cacheというサンプルがありこのサンプルの中でリスナー機能を使っているので、追加修正しながらイベントを発生させてみます。

Agenda

LoggingListenerクラスを修正

infinispanのサンプルのclustered-cacheの中で、リスナーを利用しています。今回は、このサンプルをclustered-cache-listenerという名前でコピーしていろいろなイベントを発生させてリスナーでハンドリングしてみます。 以下は今回修正したLoggingListenerの中身です。
@Listener
public class LoggingListener {

   private Log log = LogFactory.getLog(LoggingListener.class);
   @CacheEntryCreated
   public void observeAdd(CacheEntryCreatedEvent event) {
         log.infof("### Cache entry with key %s added in cache %s", event.getKey(), event.getCache());
   }
   @CacheEntryRemoved
   public void observeRemove(CacheEntryRemovedEvent event) {
      log.infof("### Cache entry with key %s removed in cache %s", event.getKey(), event.getCache());
   }
   @CacheEntryModified
   public void observeModified(CacheEntryModifiedEvent event) {
      log.infof("### Cache entry with key %s modified in cache %s", event.getKey(), event.getCache());
   }
   @CacheEntryEvicted
   public void observeEvicted(CacheEntryEvictedEvent event) {
      log.infof("### Cache entry with key %s evicted in cache %s", event.getKey(), event.getCache());
   }
   @CacheEntryInvalidated
   public void observeEvicted(CacheEntryInvalidatedEvent event) {
      log.infof("### Cache entry with key %s invalidated in cache %s", event.getKey(), event.getCache());
   }
   @CacheEntryLoaded
   public void observeEvicted(CacheEntryLoadedEvent event) {
      log.infof("### Cache entry with key %s loaded in cache %s", event.getKey(), event.getCache());
   }
   @CacheEntryActivated
   public void observeActivated(CacheEntryActivatedEvent event) {
      log.infof("### Cache entry with key %s activated in cache %s", event.getKey(), event.getCache());
   }
   @CacheEntryPassivated
   public void observePassivated(CacheEntryPassivatedEvent event) {
      log.infof("### Cache entry with key %s passivated in cache %s", event.getKey(), event.getCache());
   }
   @CacheEntryVisited
   public void observePassivated(CacheEntryVisitedEvent event) {
      log.infof("### Cache entry with key %s visited in cache %s", event.getKey(), event.getCache());
   }
   @ViewChanged
   public void observeViewChanged(ViewChangedEvent event) {
      log.infof("###Cache entry View Changed ");
   }
   @DataRehashed
   public void observeDataRehashed(DataRehashedEvent event) {
      log.infof("###Cache entry Data Rehashed ");
   }
   @TopologyChanged
   public void observeTopologyChanged(TopologyChangedEvent event) {
      log.infof("###Cluster Topology Changed ");
   }
   @TransactionCompleted
   public void observeTransactionCompleted(TransactionCompletedEvent event) {
      log.infof("###Transaction Completed ");
   }
   @TransactionRegistered
   public void observeTransactionRegistered(TransactionRegisteredEvent event) {
      log.infof("###Transaction Registered ");
   }
   @CacheStarted
   public void observeCacheStarted(CacheStartedEvent event) {
      log.infof("###Cache started cache ");
   }
   @CacheStopped
   public void observeCacheStopped(CacheStoppedEvent event) {
      log.infof("### CacheStopped cache ");
   }
   @Merged
   public void observeCacheStarted(MergeEvent event) {
      log.infof("###Cache merged ");
   }
// add end
}

Node0~2クラスを修正

もともとこのサンプルではキャッシュのイベントだけハンドリングしていたため、キャッシュにしかリスナーを登録していませんが、今回はキャッシュマネージャのイベントもハンドリングしたいため、以下のようにキャッシュマネージャにリスナーを登録するように修正します。
    EmbeddedCacheManager cm = getCacheManager();
    cm.addListener(new LoggingListener());

AbstractNodeクラスを修正

各ノードクラスの基底クラスとなっているAbstractNodeクラスでキャッシュの設定を行っているが、今回いろいろイベントを発生させるために以下のようにプロパティーを追加した。
以下はそのコード部分の抜粋です。 赤字部分で次にあげる振る舞いを設定しています。
・evictionを発生させるために、メモリ上に5エントリーしか持たないようにする
・passivationを発生させるために、passivationを有効(=true)にした。
・loadイベントを発生させるために、ファイルベースの永続ストアを用意した。
   private static EmbeddedCacheManager createCacheManagerProgramatically() {
      return new DefaultCacheManager(
            GlobalConfigurationBuilder.defaultClusteredBuilder()
                  .transport().addProperty("configurationFile", "jgroups.xml")
                  .build(),
            new ConfigurationBuilder()
                  .clustering()
                     .cacheMode(CacheMode.DIST_SYNC)
                     .hash().numOwners(2)
                     .eviction().strategy(LIRS).maxEntries(5)
                     .loaders().passivation(true).addFileCacheStore().location("store")
                  .build()
      );
   }

Node2クラスを修正

LoggingListnerクラスにエントリーを操作するコードを追加して発生するイベントの種類を増やします。 赤字部分が追加したコードです。
以下のような処理を追加しています。
・永続ストアからロードする。
・エントリーの削除
・エントリーの変更
・キャッシュのストップ
・キャッシュのスタート
      // load check
      for (int i = 0; i < 20; i++)
        System.out.println("load check key=key" + i + "  value=" + cache.get("key" + i));

    // Put a few entries into the cache so that we can see them distribution to the other nodes
      for (int i = 0; i < 20; i++)
         cache.put("key" + i, "value" + i);
    System.out.println("key1 value = " + cache.get("key1"));
    cache.remove("key1");
    String key2_value = cache.get("key2");
    cache.put("key2",key2_value + " modifyvalue");
    cache.stop();
    cache.start();
    
   }

出力内容の確認

Node2とNode1の出力を追っていくと、分散構成なのでloadされたりvisitedされたりされる イベントやキーがノードでばらばらとなり、ノードごとにkeyとvalueのセットがaddされたりとい ったことが発生している。キャッシュのエントリーを5つにしたのでメモリ上 に読み込まれたデータが4つになり5つ目をロードしようとするとpassivated、evictというイベントが発生し、 メモリ上からエントリーが追い出されていることがわかる。
Node2の出力
INFO: ### Cache entry with key key0 visited in cache Cache 'Demo'@machine-3527
2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key0 visited in cache Cache 'Demo'@machine-3527
load check key=key0  value=value0
load check key=key1  value=null
2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key2 visited in cache Cache 'Demo'@machine-3527
2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key2 visited in cache Cache 'Demo'@machine-3527
load check key=key2  value=value2 modifyvalue
2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key3 visited in cache Cache 'Demo'@machine-3527
2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key3 visited in cache Cache 'Demo'@machine-3527
load check key=key3  value=value3
2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeEvicted
INFO: ### Cache entry with key key4 loaded in cache Cache 'Demo'@machine-3527
2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeActivated
INFO: ### Cache entry with key key4 activated in cache Cache 'Demo'@machine-3527

2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeEvicted

(・・中略・・)
INFO: ### Cache entry with key key30 visited in cache Cache 'Demo'@machine-3527
2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key30 visited in cache Cache 'Demo'@machine-3527
key30 is exist
2013/04/26 18:32:13 org.infinispan.eviction.PassivationManagerImpl passivateAll
INFO: ISPN000029: Passivating all entries to disk
2013/04/26 18:32:13 org.infinispan.eviction.PassivationManagerImpl passivateAll
INFO: ISPN000030: Passivated 6 entries in 15 milliseconds
Node1の出力
INFO: ISPN000078: Starting JGroups Channel
2013/04/26 18:31:57 org.infinispan.remoting.transport.jgroups.JGroupsTransport v
iewAccepted
INFO: ISPN000094: Received new cluster view: [machine-37097|1] [machine-37097, machine-43461]
2013/04/26 18:31:57 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeViewChanged
INFO: ###Cache entry View Changed
2013/04/26 18:31:57 org.infinispan.remoting.transport.jgroups.JGroupsTransport s
tartJGroupsChannelIfNeeded
INFO: ISPN000079: Cache local address is machine-43461, physical addresses are [
10.43.1.225:3887]
2013/04/26 18:31:57 org.infinispan.factories.GlobalComponentRegistry start
INFO: ISPN000128: Infinispan version: Infinispan 'Brahma' 5.1.0.CR2
2013/04/26 18:31:57 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeCacheStarted
INFO: ###Cache started cache
2013/04/26 18:31:57 org.infinispan.config.ConfigurationValidatingVisitor visitCa
cheLoaderManagerConfig
WARN: ISPN000149: Fetch persistent state and purge on startup are both disabled,
 cache may contain stale entries on startup
2013/04/26 18:31:57 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeCacheStarted
INFO: ###Cache started cache
2013/04/26 18:32:10 org.infinispan.remoting.transport.jgroups.JGroupsTransport v
iewAccepted
INFO: ISPN000094: Received new cluster view: [machine-37097|2] [machine-37097, ma
chine-43461, machine-3527]
2013/04/26 18:32:10 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeViewChanged
INFO: ###Cache entry View Changed
2013/04/26 18:32:10 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeDataRehashed
INFO: ###Cache entry Data Rehashed
2013/04/26 18:32:10 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeTopologyChanged
INFO: ###Cluster Topology Changed
2013/04/26 18:32:10 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeTopologyChanged
INFO: ###Cluster Topology Changed
2013/04/26 18:32:10 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeDataRehashed
INFO: ###Cache entry Data Rehashed
2013/04/26 18:32:11 org.infinispan.quickstart.clusteredcache.util.ClusterValidat
ion checkReplicationSeveralTimes
INFO: Cluster formed successfully!
2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key10 visited in cache Cache 'Demo'@machine-43461

2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key10 visited in cache Cache 'Demo'@machine-43461

2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key14 visited in cache Cache 'Demo'@machine-43461

2013/04/26 18:32:12 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key14 visited in cache Cache 'Demo'@machine-43461

2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key17 visited in cache Cache 'Demo'@machine-43461

2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key17 visited in cache Cache 'Demo'@machine-43461

2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeAdd
INFO: ### Cache entry with key key2 added in cache Cache 'Demo'@machine-43461
2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeAdd
INFO: ### Cache entry with key key2 added in cache Cache 'Demo'@machine-43461
2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeModified
INFO: ### Cache entry with key key2 modified in cache Cache 'Demo'@machine-43461

2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeModified
INFO: ### Cache entry with key key2 modified in cache Cache 'Demo'@machine-43461

2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key17 passivated in cache Cache 'Demo'@machine-43
461
2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key17 passivated in cache Cache 'Demo'@machine-43
461
2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeEvicted
INFO: ### Cache entry with key key17 evicted in cache Cache 'Demo'@machine-43461

2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeEvicted

・・・(中略)・・

INFO: ### Cache entry with key key17 passivated in cache Cache 'Demo'@machine-43
461
2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observePassivated
INFO: ### Cache entry with key key17 passivated in cache Cache 'Demo'@machine-43
461
2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeEvicted
INFO: ### Cache entry with key key17 evicted in cache Cache 'Demo'@machine-43461

2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeTopologyChanged
INFO: ###Cluster Topology Changed
2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeTopologyChanged
INFO: ###Cluster Topology Changed
2013/04/26 18:32:13 org.infinispan.quickstart.clusteredcache.util.LoggingListene
r observeDataRehashed
INFO: ###Cache entry Data Rehashed

イベントのまとめ

全てのイベントをハンドリングできたわけではないですが、試行錯誤した結果をもとに、どのようなイベントかを以下にまとめました。
EvictとPassivate、visitedとactivateとloadなど同じタイミングで発生しそうなイベントもありますが、それらは同じタイミングで発生します。
また、イベントはJBossDataGrid6.0でのものを元に調査しています。
イベント  説明 
CacheEntryCreatedEvent 新しいエントリが追加された際に発生。新規にput()メソッドに対応 
CacheEntryRemovedEvent エントリが削除された際に発生。remove()メソッドに対応 
CacheEntryModifiedEvent エントリーが変更されたとき。put()メソッドでの変更に対応。変更する以外でも発生している。 
CacheEntryVisitedEvent エントリーが取得されたとき。get()メソッドで取得されたときに発生 
ViewChangedEvent クラスタの参加メンバが変化したときに発生 
CacheEntryEvictedEvent メモリ上にある要素がメモリから追い出された場合に発生 
CacheStartedEvent キャッシュが開始された際に発生 
CacheStoppedEvent キャッシュの終了された際に発生。今回は発生させることができず。 
CacheEntryLoadedEvent キャッシュのエントリを永続ストアからロードしたときに発生 
CacheEntryActivatedEvent メモリに読み込まれ、永続ストアから消されたときに発生 
CacheEntryPassivatedEvent メモリから追い出され、永続ストアに書き込まれたときに発生。キャッシュを定義する際に、loaders().passivation(true)とした場合にPassivateするようになる。 
CacheEntryInvalidatedEvent キャッシュのエントリが無効になったときに発生。今回発生させることが出来ず。Manik surtaniの説明によると、キャッシュをInvalidation modeに設定した際に、キャッシュの属性を変更した場合に、別のキャッシュ上にあるエントリーが無効になるがその場合に発生する。
http://architects.dzone.com/articles/infinispan-power-user-cache-mode 
DataRehashedEvent データのハッシュが再読み込みされた際に発生?キャッシュが開始されたタイミングでは発生します
TopologyChanged クラスタの構成が変わったときに発生。CacheMode#DIST_SYNC or Configuration.CacheMode#DIST_ASYNCに設定された場合に発生します。キャッシュが開始されたタイミングでは発生しているようです
TransactionCompletedEvent トランザクション完了したときに発生。
TransactionRegisteredEvent トランザクションが開始された際に発生。

非常に簡単ですが、リスナーの機能について見てみましたが、キャッシュ、キャッシュマネージャ、データの状態の変化のタイミングで何かをする必要があるときにはこのリスナーの機能を使って処理することになります。
リスナー機能の性質上、JBossDataGridを使って少し複雑なことをしようとする際には、これらの知識は必須となりそうです。そのため、もう少し深く調べていく必要がありそうですが、それはまた別の機会とさせていただきます。 お付き合いいただきありがとうございました。

参考

参考のために修正したコードをリポジトリにアップしました。
https://github.com/SIOS-OSS/clustered-cache-listener

0 件のコメント:

コメントを投稿