ZooKeeper构建配置服务
ZooKeeper构建配置服务
*配置服务是分布式应用所需要的基本服务之一,它使集群中的机器可以共享配置信息中那些公共的部分。
*简单的说,ZooKeeper可以作为一个具有高可用性的配置存储器,允许分布式应用的参与者检索和更新配置文件。
*使用ZooKeeper中的观察机制,可以建立一个活跃的配置服务,使那些感兴趣的客户端能够获得配置信息修改的通知。
在每个znode上存储一个键值对,ActiveKeyValueStore 提供了从zookeeper服务上写和读取键值方法。
publicclassActiveKeyValueStoreextendsConnectionWatcher{privatestaticfinalCharsetCHARSET=Charset.forName("GBk");privatestaticfinalintMAX_RETRIES=5;privatestaticfinallongRETRY_PERIOD_SECONDS=60;publicvoidwrite(Stringpath,Stringvalue)throwsException{intretries=0;while(true){try{Statstat=zk.exists(path,false);if(stat==null){zk.create(path,value.getBytes(CHARSET),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}else{zk.setData(path,value.getBytes(CHARSET),-1);}}catch(KeeperException.SessionExpiredExceptione){throwe;}catch(KeeperExceptione){if(retries++==MAX_RETRIES){throwe;}TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS);}}}publicStringread(Stringpath,Watcherwatcher)throwsException{byte[]data=zk.getData(path,watcher,null);returnnewString(data,CHARSET);}}
与zookeeper服务创建连接
publicclassConnectionWatcherimplementsWatcher{privatestaticfinalintSESSION_TIMEOUT=5000;protectedZooKeeperzk;privateCountDownLatchconnectedSignal=newCountDownLatch(1);publicvoidconnect(Stringhosts)throwsException{//创建zookeeper实例的时候会启动一个线程连接到zookeeper服务。zk=newZooKeeper(hosts,SESSION_TIMEOUT,this);connectedSignal.await();}//当客户端已经与zookeeper建立连接后,Watcher的process方法会被调用。publicvoidprocess(WatchedEventevent){if(event.getState()==KeeperState.SyncConnected){connectedSignal.countDown();}}publicvoidclose()throwsException{zk.close();}}
ResilientConfigUpdater类提供了管理更新配置信息方法。
publicclassResilientConfigUpdater{publicstaticfinalStringPATH="/config";privateActiveKeyValueStorestore;privateRandomrandom=newRandom();publicResilientConfigUpdater(Stringhosts)throwsException{store=newActiveKeyValueStore();store.connect(hosts);}publicvoidrun()throwsException{while(true){Stringvalue=random.nextInt(100)+"";store.write(PATH,value);System.out.printf("Set%sto%s\n",PATH,value);TimeUnit.SECONDS.sleep(random.nextInt(10));}}publicstaticvoidmain(String[]args)throwsException{while(true){try{ResilientConfigUpdaterupdater=newResilientConfigUpdater("192.168.44.231");updater.run();}catch(KeeperException.SessionExpiredExceptione){//startanewsession}catch(KeeperExceptione){e.printStackTrace();break;}}}}
ConfigWatcher类提供了配置信息变更观察器,在信息修改后会触发显示方法被调用。
publicclassConfigWatcherimplementsWatcher{privateActiveKeyValueStorestore;publicConfigWatcher(Stringhosts)throwsException{store=newActiveKeyValueStore();store.connect(hosts);}publicvoiddisplayConfig()throwsException{Stringvalue=store.read(ConfigUpdater.PATH,this);System.out.printf("Read%sas%s\n",ConfigUpdater.PATH,value);}publicvoidprocess(WatchedEventevent){//TODOAuto-generatedmethodstubif(event.getType()==EventType.NodeDataChanged){try{displayConfig();}catch(Exceptione){System.out.println("Interrupted.Exiting.");Thread.currentThread().interrupt();}}}publicstaticvoidmain(String[]args)throwsException{ConfigWatcherwatcher=newConfigWatcher("192.168.44.231");watcher.displayConfig();Thread.sleep(Long.MAX_VALUE);}}
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。