版权声明:本文为博主原创文章,未经博主允许不得转载。

目录(?)[+]

一、Transaction interface

Transaction接口是基于flume的稳定性考虑的。所有主要的组件(sources、sinks、channels)都必须使用Flume Transaction。我们也可以理解Transaction接口就是flume的事务,sources和sinks的发送数据与接受数据都是在一个Transaction里完成的。


从上图中可以看出,一个Transaction在Channel实现内实现。每一个连接到channel的source和sink都要获取一个Transaction对象。这Sources实际上使用了一个ChannelSelector接口来封装Transaction。存放事件到channel和从channel中提取事件的操作是在一个活跃的Transaction内执行的。

下面是官网例子


[java]view plaincopy

Channelch=newMemoryChannel();

Transactiontxn=ch.getTransaction();

txn.begin();

try{

//ThistryclauseincludeswhateverChanneloperationsyouwanttodo

EventeventToStage=EventBuilder.withBody("HelloFlume!",

Charset.forName("UTF-8"));

ch.put(eventToStage);

//EventtakenEvent=ch.take();

//...

txn.commit();

}catch(Throwablet){

txn.rollback();

//Logexception,handleindividualexceptionsasneeded

//re-throwallErrors

if(tinstanceofError){

throw(Error)t;

}

}finally{

txn.close();

}


上面的代码是一个很简单的Transaction示例,在自定义Source与自定义Sink中都要使用。

二、自定义Sink开发

Sink提取event数据从channel中,然后直接将数据发送到下一个flume agent中或者存储到外部库中。

Sink和channel的关联关系可以在配置文件中配置。有一个SinkRunner实例与每一个已配置的Sink关联,当Flume框架调用SinkRunner.start()方法时候,将创建一个新的线程来驱动这Sink。

这个线程将管理这个Sink的生命周期。Sink需要实现LifecycleAware接口的start()和stop()方法。start()方法用于初始化数据;stop()用于释放资源;process()是从channel中提取event数据和转发数据的核心方法。

这Sink需要实现Configurable接口以便操作配置文件。

下面是官网例子:

[java]view plaincopy

publicclassMySinkextendsAbstractSinkimplementsConfigurable{

privateStringmyProp;

@Override

publicvoidconfigure(Contextcontext){

StringmyProp=context.getString("myProp","defaultValue");

//ProcessthemyPropvalue(e.g.validation)

//StoremyPropforlaterretrievalbyprocess()method

this.myProp=myProp;

}

@Override

publicvoidstart(){

//Initializetheconnectiontotheexternalrepository(e.g.HDFS)that

//thisSinkwillforwardEventsto..

}

@Override

publicvoidstop(){

//Disconnectfromtheexternalrespositoryanddoany

//additionalcleanup(e.g.releasingresourcesornulling-out

//fieldvalues)..

}

@Override

publicStatusprocess()throwsEventDeliveryException{

Statusstatus=null;

//Starttransaction

Channelch=getChannel();

Transactiontxn=ch.getTransaction();

txn.begin();

try{

//ThistryclauseincludeswhateverChanneloperationsyouwanttodo

Eventevent=ch.take();

//SendtheEventtotheexternalrepository.

//storeSomeData(e);

txn.commit();

status=Status.READY;

}catch(Throwablet){

txn.rollback();

//Logexception,handleindividualexceptionsasneeded

status=Status.BACKOFF;

//re-throwallErrors

if(tinstanceofError){

throw(Error)t;

}

}finally{

txn.close();

}

returnstatus;

}

}

下面是测试例子:

[java]view plaincopy

importorg.apache.flume.Channel;

importorg.apache.flume.Context;

importorg.apache.flume.Event;

importorg.apache.flume.EventDeliveryException;

importorg.apache.flume.Transaction;

importorg.apache.flume.conf.Configurable;

importorg.apache.flume.sink.AbstractSink;

publicclassCustom_SinkextendsAbstractSinkimplementsConfigurable{

privateStringmyProp;

@Override

publicvoidconfigure(Contextcontext){

StringmyProp=context.getString("myProp","defaultValue");

//ProcessthemyPropvalue(e.g.validation)

//StoremyPropforlaterretrievalbyprocess()method

this.myProp=myProp;

}

@Override

publicvoidstart(){

//Initializetheconnectiontotheexternalrepository(e.g.HDFS)that

//thisSinkwillforwardEventsto..

}

@Override

publicvoidstop(){

//Disconnectfromtheexternalrespositoryanddoany

//additionalcleanup(e.g.releasingresourcesornulling-out

//fieldvalues)..

}

@Override

publicStatusprocess()throwsEventDeliveryException{

Statusstatus=null;

//Starttransaction

Channelch=getChannel();

Transactiontxn=ch.getTransaction();

txn.begin();

try{

//ThistryclauseincludeswhateverChanneloperationsyouwanttodo

Eventevent=ch.take();

Stringout=newString(event.getBody());

//SendtheEventtotheexternalrepository.

//storeSomeData(e);

System.out.println(out);

txn.commit();

status=Status.READY;

}catch(Throwablet){

txn.rollback();

//Logexception,handleindividualexceptionsasneeded

status=Status.BACKOFF;

//re-throwallErrors

if(tinstanceofError){

throw(Error)t;

}

}finally{

txn.close();

}

returnstatus;

}

}

上面的测试例子只输出事件的BODY信息,这里说明下直接用代码event.getBody().tostring() 输出是乱码。因为所有sink都是在Transaction里完成的,因此自定义开发sink是需要加上Transaction相关设置。

然后是测试配置,这里是自定义的jar 包是flumedev.Custom_Sink。注意,打包之后请放在目录$FLUME_HOME/lib下

[html]view plaincopy

#配置文件:custom_sink_case23.conf

#Namethecomponentsonthisagent

a1.sources=r1

a1.sinks=k1

a1.channels=c1

#Describe/configurethesource

a1.sources.r1.type=syslogtcp

a1.sources.r1.port=50000

a1.sources.r1.bind=192.168.233.128

a1.sources.r1.channels=c1

#Describethesink

a1.sinks.k1.channel=c1

a1.sinks.k1.type=flumedev.Custom_Sink

#a1.sinks.k1.type=logger

#Useachannelwhichbufferseventsinmemory

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

#敲命令

flume-ng agent -cconf -f conf/custom_sink_case23.conf -n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "testcustom_sink" | nc 192.168.233.128 50000

#在启动的终端查看console输出


可以看到数据正常输出。


三、自定义Source开发

Source从外面接收数据并把数据存入Channel中。很少有人用。

下面是官网的例子

[java]view plaincopy

publicclassMySourceextendsAbstractSourceimplementsConfigurable,PollableSource{

privateStringmyProp;

@Override

publicvoidconfigure(Contextcontext){

StringmyProp=context.getString("myProp","defaultValue");

//ProcessthemyPropvalue(e.g.validation,converttoanothertype,...)

//StoremyPropforlaterretrievalbyprocess()method

this.myProp=myProp;

}

@Override

publicvoidstart(){

//Initializetheconnectiontotheexternalclient

}

@Override

publicvoidstop(){

//Disconnectfromexternalclientanddoanyadditionalcleanup

//(e.g.releasingresourcesornulling-outfieldvalues)..

}

@Override

publicStatusprocess()throwsEventDeliveryException{

Statusstatus=null;

//Starttransaction

Channelch=getChannel();

Transactiontxn=ch.getTransaction();

txn.begin();

try{

//ThistryclauseincludeswhateverChanneloperationsyouwanttodo

//Receivenewdata

Evente=getSomeData();

//StoretheEventintothisSource'sassociatedChannel(s)

getChannelProcessor().processEvent(e)

txn.commit();

status=Status.READY;

}catch(Throwablet){

txn.rollback();

//Logexception,handleindividualexceptionsasneeded

status=Status.BACKOFF;

//re-throwallErrors

if(tinstanceofError){

throw(Error)t;

}

}finally{

txn.close();

}

returnstatus;

}

}


测试的话,主要针对Event e 这里进行传输数据,这里就不测试了。

四、自定义Channel开发

官网说待定。

下面是美团网的自定义Channel 开发,下面是链接

http://tech.meituan.com/mt-log-system-optimization.html

……

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。

其具体的逻辑如下:

[java]view plaincopy

/***

*putToMemChannelindicateputeventtomemChannelorfileChannel

*takeFromMemChannelindicatetakeeventfrommemChannelorfileChannel

**/

privateAtomicBooleanputToMemChannel=newAtomicBoolean(true);

privateAtomicBooleantakeFromMemChannel=newAtomicBoolean(true);

voiddoPut(Eventevent){

if(switchon&&putToMemChannel.get()){

//往memChannel中写数据

memTransaction.put(event);

if(memChannel.isFull()||fileChannel.getQueueSize()>100){

putToMemChannel.set(false);

}

}else{

//往fileChannel中写数据

fileTransaction.put(event);

}

}

EventdoTake(){

Eventevent=null;

if(takeFromMemChannel.get()){

//从memChannel中取数据

event=memTransaction.take();

if(event==null){

takeFromMemChannel.set(false);

}

}else{

//从fileChannel中取数据

event=fileTransaction.take();

if(event==null){

takeFromMemChannel.set(true);

putToMemChannel.set(true);

}

}

returnevent;

}

这里要说明下,官网是建议使用file channel,虽然它的效率比较低,但是它能保证数据完整性,而memory channel效率高,但是只能对数据丢失和重复不太敏感的业务使用