BigData/Flume

Flume(TCP/AVRO source + Memory channel + logger sink) - Log4j로 연동하기

Tomining 2016. 1. 20. 21:42
flume 설정
agent.sources = testSource
agent.channels = testChannel
agent.sinks = testSink

# For each one of the sources, the type is defined
agent.sources.testSource.type = syslogtcp
agent.sources.testSource.host = 127.0.0.1
agent.sources.testSource.port = 9999
agent.sources.testSource.channels = testChannel

# Each sink's type must be defined
agent.sinks.testSink.type = logger
agent.sinks.testSink.channel = testChannel

# Each channel's type is defined.
agent.channels.testChannel.type = memory
agent.channels.testChannel.capacity = 1000
agent.channels.testChannel.transactionCapacity = 100

flume 실행
flume-ng node -n agent -c /home1/irteam/apps/flume/conf -f /home1/irteam/apps/flume/conf/sample-flume-conf.properties -Dflume.root.logger=INFO,console


client에서 log4j를 연동해서 flume으로 전송해보기
1. 먼저 flume-ng-sdk-1.4.0.jar가 classpath에 있어야 한다. 아래와 같이 Maven dependency 설정을 하자.
<dependency>
     <groupId>org.apache.flume</groupId>
     <artifactId>flume-ng-sdk</artifactId>
     <version>1.4.0</version>
     <type>jar</type>
     <scope>compile</scope>
</dependency>

2. log4j.xml 에 appender 설정
<appender name="flume-appender" class="org.apache.flume.clients.log4jappender.Log4jAppender">
     <param name="hostname" value="127.0.01"/>
     <param name="port" value="9999"/>
</appender>

<logger name="flume" additivity="false">
     <level value="INFO"/>
     <appender-ref ref="flume-appender"/>
</logger>

3. log로 찍어보기.
@Test
public void flume전송_테스트() {
     Log log = LogFactory.getLog("flume");
     log.info("Hello World");
}
위 코드를 수행했더니 아래와 같은 오류가 발생하였다. invalid data??

org.apache.flume.FlumeException: Flume append() failed. Exception follows.
     at org.apache.flume.clients.log4jappender.Log4jAppender.append(Log4jAppender.java:170)
     at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
     at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
     at org.apache.log4j.Category.callAppenders(Category.java:206)
     at org.apache.log4j.Category.forcedLog(Category.java:391)
     at org.apache.log4j.Category.log(Category.java:856)
     at org.apache.commons.logging.impl.Log4JLogger.info(Log4JLogger.java:199)
     at com.nhncorp.umon.iv.VideoPlayStatusTypeTest.flume전송_테스트(VideoPlayStatusTypeTest.java:27)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
     at java.lang.reflect.Method.invoke(Method.java:597)
     at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
     at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
     at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
     at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
     at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
     at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
     at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
     at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
     at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
     at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
     at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
     at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 127.0.0.1, port: 9999 }: Failed to send event
     at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:229)
     at org.apache.flume.clients.log4jappender.Log4jAppender.append(Log4jAppender.java:163)
     ... 30 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 127.0.0.1, port: 9999 }: Handshake timed out after 20000 ms
     at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:262)
     at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:217)
     ... 31 more
Caused by: java.util.concurrent.TimeoutException
     at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
     at java.util.concurrent.FutureTask.get(FutureTask.java:91)
     at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:260)
     ... 32 more


정확한 원인은 잘 모르겠지만, Flume과의 데이터 전송에서 avro 형식을 사용하는 것으로 보인다. 그래서 flume의 source.type을 avro로 변경해 보니 정상적으로 동작이 된다.
agent.sources = testSource
agent.channels = testChannel
agent.sinks = testSink

# For each one of the sources, the type is defined
agent.sources.testSource.type = avro
agent.sources.testSource.bind = 127.0.0.1
agent.sources.testSource.port = 9999
agent.sources.testSource.channels = testChannel

# Each sink's type must be defined
agent.sinks.testSink.type = logger
agent.sinks.testSink.channel = testChannel

# Each channel's type is defined.
agent.channels.testChannel.type = memory
agent.channels.testChannel.capacity = 1000
agent.channels.testChannel.transactionCapacity = 100 #size가 작은 경우 sink되는 data가 잘리는 현상이 발생한다.