BigData/Flume
Flume RpcClient 사용기
Tomining
2016. 1. 20. 21:16
Flume Avro 데이터를 전송하기 위한 방법은 몇가지가 있으나, 보통 쉬운 log4j appender 를 이용하는 것으로 알고 있다.
log4j appender 의 경우 flume-ng-sdk.jar 에서 제공되기 때문에 log4j 설정만 하면되는 간편함이 있다.
하지만 log4j 특성상 전송되는 Event 에 Header에 데이터를 포함해서 전송할 수 없다.
만약 Flume 으로 데이터 전송시 Header 에 원하는 데이터를 전달할 수 있다면, Multiplexing 설정을 통해 좀 더 General 한 source-channel-sink 구조를 만들 수 있을 것이라 생각했다. 이를 위해서 Flume 에서 제공되는 것이 RpcClient 클래스이다.
Flume 1.6.0(최신버전 at 2016/01/19) 와 java 1.8 기반에서 테스트를 진행하였다.
단일 Host 를 대상으로 Event 를 전송하는 코드 샘플이다.
public class SingleFlumeClient {
private final String host; private final int port; private RpcClient client; public SingleFlumeClient(String host, int port) { this.host = host; this.port = port; this.client = RpcClientFactory.getDefaultInstance(host, port); } public String getHost() { return host; } public int getPort() { return port; } public RpcClient getClient() { return client; } public void send(String data) { this.send(data, "UTF-8", null); } public void send(String data, Map<String, String> header) { this.send(data, "UTF-8", header); } private void send(String data, String encoding, Map<String, String> header) { Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"), header); try { getClient().append(event); } catch (EventDeliveryException e) { reconnect(); } } public void clear() { this.client.close(); } private void reconnect() { this.client.close(); this.client = null; this.client = RpcClientFactory.getDefaultInstance(getHost(), getPort()); } } |
public class SingleFlumeClientTest {
private static final String FLUME_HOST = “127.0.0.1";
private static final int FLUME_PORT = 9999;
private SingleFlumeClient client; @Before public void setUp() { client = new SingleFlumeClient(FLUME_HOST, FLUME_PORT); } @Test
public void Flume_Avro_DEFAULT_TYPE() {
Map<String, String> header = new HashMap<String, String>();
client.send("Default_Type......................................", header);
}
@Test public void Flume_Avro_MONGO_TYPE() {
Map<String, String> header = new HashMap<String, String>();
header.put("type", "MONGO"); client.send("MONGO_Type......................................", header);
}
@Test
public void Flume_Avro_FILE_TYPE() {
Map<String, String> header = new HashMap<String, String>();
header.put("type", "FILE"); client.send("FILE_Type......................................", header);
}
} |
위 TC 를 수행하기 전에 먼저 flume source-channel-sink 설정이 필요하다.
agent.sources = testSource
agent.channels = testChannel testChannel2
agent.sinks = testMongoSink testFileSink
|
agent.sources.testSource.type = avro
agent.sources.testSource.bind = 127.0.0.1
agent.sources.testSource.port = 9999
agent.sources.testSource.channels = testChannel testChannel2
agent.sources.testSource.selector.type = multiplexing
agent.sources.testSource.selector.header = type
agent.sources.testSource.selector.mapping.MONGO = testChannel
agent.sources.testSource.selector.mapping.FILE = testChannel2
agent.sources.testSource.selector.default = testChannel
|
# channel 1
agent.channels.testChannel.type = memory
agent.channels.testChannel.capacity = 10000
agent.channels.testChannel.transactionCapacity = 10000
agent.channels.testChannel.byteCapacity = 536870912
# channel 2
agent.channels.testChannel2.type = memory
agent.channels.testChannel2.capacity = 10000
agent.channels.testChannel2.transactionCapacity = 10000
agent.channels.testChannel2.byteCapacity = 536870912
|
# Mongo Sink
agent.sinks.testMongoSink.type = Custom Mongo Sink
# Mongo Config...
agent.sinks.testMongoSink.channel = testChannel
# File Sink
agent.sinks.testFileSink.type = file_roll
agent.sinks.testFileSink.sink.directory = /home1/irteam/logs/temp/flume
agent.sinks.testFileSink.channel = testChannel2
|
위 설정을 했다면 Flume 을 재구동한 뒤 Testcase 를 수행해 보자.
정상적으로 File sink 를 통해 내용이 저장됨을 확인할 수 있다. Mongo Sink 또한 별도 툴을 이용하여 확인해 보면 데이터가 저장됨을 확인 할 수 있다.
이렇게만 하면 Flume 을 통해 많은 실시간 데이터를 전송하고 저장할 수 있을까?
아직도 고려해야하는 사항이 많이 있다.
- 얼마나 많은 데이터를 하나의 Flume 서버에서 받을 수 있을까?
L4 장비를 통해 LoadBalancing 을 할 수 있으나, L4를 사용할 수 없다면?? - 만약 그 Flume 서버가 죽는다면? 대안이 있는가?
위 케이를 위해서 Flume 에서는 두 가지 종류의 Client 를 추가로 지원한다.
- LoadBalancing Client
- FailOver Client
이 외에도 Secure Client 로 접속시 인증을 거치도록 하는 방식도 있다.
상세한 내용은 Flume 개발자 가이드 페이지를 참고하자.
여기서 LoadBalancing Client 를 구현해보자.
public class LoadBalancingFlumeClient {
private static final String DEFAULT_CLIENT_TYPE = "default_loadbalance"; private static final String DEFAULT_SELECTOR = "random"; // round_robin private static final boolean DEFAULT_BACKOFF = true; private static final int DEFAULT_MAX_BACKOFF = 10000; private String clientType; private String[] hosts; private String selector; private boolean backOff; private int maxBackOff; private Properties props = null; private RpcClient client; private Properties getProperties() { if (props == null) { props = new Properties(); props.put("client.type", this.getClientType()); props.put("host-selector", this.getSelector()); props.put("backoff", Boolean.toString(this.isBackOff())); props.put("maxBackoff", Integer.toString(this.getMaxBackOff())); props = setHostsInfo(props); } return props; } private Properties setHostsInfo(Properties props) { String[] hosts = this.getHosts(); int numOfHosts = ArrayUtils.getLength(hosts); String[] hostsAlias = new String[numOfHosts]; for (int index = 0; index < numOfHosts; index++) { hostsAlias[index] = "host" + index; props.put("hosts." + hostsAlias[index], hosts[index]); } props.put("hosts", StringUtils.join(hostsAlias, FlumeConstants.DELEMETER_WHITE_SPACE)); return props; } public LoadBalancingFlumeClient(String hostList) { this(DEFAULT_CLIENT_TYPE, hostList, DEFAULT_SELECTOR, DEFAULT_BACKOFF, DEFAULT_MAX_BACKOFF); } public LoadBalancingFlumeClient(String clientType, String hostList, String selector, boolean backOff, int maxBackOff) { this.clientType = clientType; this.hosts = StringUtils.split(hostList, FlumeConstants.DELEMETER_COMMA); this.selector = selector; this.backOff = backOff; this.maxBackOff = maxBackOff; this.client = RpcClientFactory.getInstance(getProperties()); } public String getClientType() { return clientType; } public String[] getHosts() { return hosts; } public String getSelector() { return selector; } public boolean isBackOff() { return backOff; } public int getMaxBackOff() { return maxBackOff; } public RpcClient getClient() { return client; } public void send(String data) { this.send(data, "UTF-8", null); } public void send(String data, Map<String, String> header) { this.send(data, "UTF-8", header); } private void send(String data, String encoding, Map<String, String> header) { Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"), header); try { getClient().append(event); } catch (EventDeliveryException e) { reconnect(); } } public void clear() { this.client.close(); } private void reconnect() { this.client.close(); this.client = null; this.client = RpcClientFactory.getInstance(getProperties()); } } |
public class LoadBalancingFlumeClientTest {
private static final String FLUME_HOSTS = "10.101.57.130:9999,10.101.56.237:9999"; private LoadBalancingFlumeClient client; @Before public void setUp() { client = new LoadBalancingFlumeClient(FLUME_HOSTS); } @Test
public void Flume_Avro_DEFAULT_TYPE() {
Map<String, String> header = new HashMap<String, String>();
for (int index = 0; index < 10; index++) {
client.send("Default_Type......................................" + index, header);
}
} @Test public void Flume_Avro_MONGO_TYPE() {
Map<String, String> header = new HashMap<String, String>();
header.put("type", "MONGO"); for (int index = 0; index < 10; index++) { client.send("MONGO_Type......................................" + index, header);
}
} @Test public void Flume_Avro_FILE_TYPE() {
Map<String, String> header = new HashMap<String, String>();
header.put("type", "FILE"); for (int index = 0; index < 10; index++) { client.send("FILE_Type......................................" + index, header);
}
} } |
코드를 살펴보면 단일 호스트와의 차이점은 Host 설정시 방법의 차이일 뿐이다.
자세한 설명은 생략한다.
TC 를 실행해 보면 정상적으로 LoadBalancing 되어 전송됨을 확인할 수 있다.
LoadBalancing 방식은 random 과 round_robin 방식을 지원하며, 기본값은 random 이다.
이로써 적절한 Client 를 선택해 구현하면 Flume 전송을 효율적으로 할 수 있다.
Header 데이터를 통해서 좀 더 유연한 데이터 수집 플랫폼도 만들어 볼 수 있을 것 같다.
참고사항