BigData/Flume
Flume(TCP/AVRO source + Memory channel + logger sink) - Log4j로 연동하기
Tomining
2016. 1. 20. 21:42
flume 설정
agent.sources = testSource
agent.channels = testChannel
agent.sinks = testSink
# For each one of the sources, the type is defined
agent.sources.testSource.type = syslogtcp
agent.sources.testSource.host = 127.0.0.1
agent.sources.testSource.port = 9999
agent.sources.testSource.channels = testChannel
# Each sink's type must be defined
agent.sinks.testSink.type = logger
agent.sinks.testSink.channel = testChannel
# Each channel's type is defined.
agent.channels.testChannel.type = memory
agent.channels.testChannel.capacity = 1000
agent.channels.testChannel.transactionCapacity = 100
|
flume 실행
flume-ng node -n agent -c /home1/irteam/apps/flume/conf -f /home1/irteam/apps/flume/conf/sample-flume-conf.properties -Dflume.root.logger=INFO,console |
client에서 log4j를 연동해서 flume으로 전송해보기
1. 먼저 flume-ng-sdk-1.4.0.jar가 classpath에 있어야 한다. 아래와 같이 Maven dependency 설정을 하자.
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.4.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency> |
2. log4j.xml 에 appender 설정
<appender name="flume-appender" class="org.apache.flume.clients.log4jappender.Log4jAppender">
<param name="hostname" value="127.0.01"/>
<param name="port" value="9999"/>
</appender>
<logger name="flume" additivity="false">
<level value="INFO"/>
<appender-ref ref="flume-appender"/>
</logger> |
3. log로 찍어보기.
@Test
public void flume전송_테스트() {
Log log = LogFactory.getLog("flume");
log.info("Hello World");
} |
위 코드를 수행했더니 아래와 같은 오류가 발생하였다. invalid data??
org.apache.flume.FlumeException: Flume append() failed. Exception follows.
at org.apache.flume.clients.log4jappender.Log4jAppender.append(Log4jAppender.java:170)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.apache.commons.logging.impl.Log4JLogger.info(Log4JLogger.java:199)
at com.nhncorp.umon.iv.VideoPlayStatusTypeTest.flume전송_테스트(VideoPlayStatusTypeTest.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 127.0.0.1, port: 9999 }: Failed to send event
at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:229)
at org.apache.flume.clients.log4jappender.Log4jAppender.append(Log4jAppender.java:163)
... 30 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 127.0.0.1, port: 9999 }: Handshake timed out after 20000 ms
at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:262)
at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:217)
... 31 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
at java.util.concurrent.FutureTask.get(FutureTask.java:91)
at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:260)
... 32 more
|
정확한 원인은 잘 모르겠지만, Flume과의 데이터 전송에서 avro 형식을 사용하는 것으로 보인다. 그래서 flume의 source.type을 avro로 변경해 보니 정상적으로 동작이 된다.
agent.sources = testSource
agent.channels = testChannel
agent.sinks = testSink
# For each one of the sources, the type is defined agent.sources.testSource.type = avro
agent.sources.testSource.bind = 127.0.0.1
agent.sources.testSource.port = 9999
agent.sources.testSource.channels = testChannel
# Each sink's type must be defined
agent.sinks.testSink.type = logger
agent.sinks.testSink.channel = testChannel
# Each channel's type is defined.
agent.channels.testChannel.type = memory
agent.channels.testChannel.capacity = 1000
agent.channels.testChannel.transactionCapacity = 100 #size가 작은 경우 sink되는 data가 잘리는 현상이 발생한다. |