BigData/Flume

Flume RpcClient 사용기

Tomining 2016. 1. 20. 21:16
Flume Avro 데이터를 전송하기 위한 방법은 몇가지가 있으나, 보통 쉬운 log4j appender 를 이용하는 것으로 알고 있다.
log4j appender 의 경우 flume-ng-sdk.jar 에서 제공되기 때문에 log4j 설정만 하면되는 간편함이 있다.
하지만 log4j 특성상 전송되는 Event 에 Header에 데이터를 포함해서 전송할 수 없다.

만약 Flume 으로 데이터 전송시 Header 에 원하는 데이터를 전달할 수 있다면, Multiplexing 설정을 통해 좀 더 General 한 source-channel-sink 구조를 만들 수 있을 것이라 생각했다. 이를 위해서 Flume 에서 제공되는 것이 RpcClient 클래스이다.

Flume 1.6.0(최신버전 at 2016/01/19) 와 java 1.8 기반에서 테스트를 진행하였다.

단일 Host 를 대상으로 Event 를 전송하는 코드 샘플이다.

public class SingleFlumeClient {
    private final String host;
    private final int port;
    private RpcClient client;

    public SingleFlumeClient(String host, int port) {
        this.host = host;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(host, port);
    }

    public String getHost() {
        return host;
    }

    public int getPort() {
        return port;
    }

    public RpcClient getClient() {
        return client;
    }

    public void send(String data) {
        this.send(data, "UTF-8"null);
    }

    public void send(String data, Map<String, String> header) {
        this.send(data, "UTF-8", header);
    }

    private void send(String data, String encoding, Map<String, String> header) {
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"), header);

        try {
            getClient().append(event);
        } catch (EventDeliveryException e) {
            reconnect();
        }
    }

    public void clear() {
        this.client.close();
    }

    private void reconnect() {
        this.client.close();
        this.client null;
        this.client = RpcClientFactory.getDefaultInstance(getHost(), getPort());
    }
}
public class SingleFlumeClientTest {
    private static final String FLUME_HOST “127.0.0.1";
    private static final int FLUME_PORT 9999;
    private SingleFlumeClient client;

    @Before
    public void setUp() {
        client new SingleFlumeClient(FLUME_HOSTFLUME_PORT);
    }

    @Test
    public void Flume_Avro_DEFAULT_TYPE() {
        Map<String, String> header = new HashMap<String, String>();
        client.send("Default_Type......................................", header);
    }

    @Test
    public void Flume_Avro_MONGO_TYPE() {
        Map<String, String> header = new HashMap<String, String>();
        header.put("type""MONGO");
        client.send("MONGO_Type......................................", header);
    }

    @Test
    public void Flume_Avro_FILE_TYPE() {
        Map<String, String> header = new HashMap<String, String>();
        header.put("type""FILE");
        client.send("FILE_Type......................................", header);
    }
}

위 TC 를 수행하기 전에 먼저 flume source-channel-sink 설정이 필요하다.

agent.sources = testSource
agent.channels = testChannel testChannel2
agent.sinks = testMongoSink testFileSink
agent.sources.testSource.type = avro
agent.sources.testSource.bind = 127.0.0.1
agent.sources.testSource.port = 9999
agent.sources.testSource.channels = testChannel testChannel2
agent.sources.testSource.selector.type = multiplexing
agent.sources.testSource.selector.header = type
agent.sources.testSource.selector.mapping.MONGO = testChannel
agent.sources.testSource.selector.mapping.FILE = testChannel2
agent.sources.testSource.selector.default = testChannel
# channel 1
agent.channels.testChannel.type = memory
agent.channels.testChannel.capacity = 10000
agent.channels.testChannel.transactionCapacity = 10000
agent.channels.testChannel.byteCapacity = 536870912
# channel 2
agent.channels.testChannel2.type = memory
agent.channels.testChannel2.capacity = 10000
agent.channels.testChannel2.transactionCapacity = 10000
agent.channels.testChannel2.byteCapacity = 536870912
# Mongo Sink
agent.sinks.testMongoSink.type = Custom Mongo Sink
# Mongo Config...
agent.sinks.testMongoSink.channel = testChannel
# File Sink
agent.sinks.testFileSink.type = file_roll
agent.sinks.testFileSink.sink.directory = /home1/irteam/logs/temp/flume
agent.sinks.testFileSink.channel = testChannel2

위 설정을 했다면 Flume 을 재구동한 뒤 Testcase 를 수행해 보자.


정상적으로 File sink 를 통해 내용이 저장됨을 확인할 수 있다. Mongo Sink 또한 별도 툴을 이용하여 확인해 보면 데이터가 저장됨을 확인 할 수 있다.

이렇게만 하면 Flume 을 통해 많은 실시간 데이터를 전송하고 저장할 수 있을까?
아직도 고려해야하는 사항이 많이 있다.

  1. 얼마나 많은 데이터를 하나의 Flume 서버에서 받을 수 있을까?
    L4 장비를 통해 LoadBalancing 을 할 수 있으나, L4를 사용할 수 없다면??
  2. 만약 그 Flume 서버가 죽는다면? 대안이 있는가? 

위 케이를 위해서 Flume 에서는 두 가지 종류의 Client 를 추가로 지원한다.
  • LoadBalancing Client
  • FailOver Client

이 외에도 Secure Client 로 접속시 인증을 거치도록 하는 방식도 있다.
상세한 내용은 Flume 개발자 가이드 페이지를 참고하자.

여기서 LoadBalancing Client 를 구현해보자.

public class LoadBalancingFlumeClient {
    private static final String DEFAULT_CLIENT_TYPE "default_loadbalance";
    private static final String DEFAULT_SELECTOR "random";    // round_robin
    private static final boolean DEFAULT_BACKOFF true;
    private static final int DEFAULT_MAX_BACKOFF 10000;

    private String clientType;
    private String[] hosts;
    private String selector;
    private boolean backOff;
    private int maxBackOff;

    private Properties props null;
    private RpcClient client;

    private Properties getProperties() {
        if (props == null) {
            props new Properties();
            props.put("client.type"this.getClientType());
            props.put("host-selector"this.getSelector());
            props.put("backoff", Boolean.toString(this.isBackOff()));
            props.put("maxBackoff", Integer.toString(this.getMaxBackOff()));
            props = setHostsInfo(props);
        }

        return props;
    }

    private Properties setHostsInfo(Properties props) {
        String[] hosts = this.getHosts();
        int numOfHosts = ArrayUtils.getLength(hosts);
        String[] hostsAlias = new String[numOfHosts];
        for (int index = 0; index < numOfHosts; index++) {
            hostsAlias[index] = "host" + index;
            props.put("hosts." + hostsAlias[index], hosts[index]);
        }
        props.put("hosts", StringUtils.join(hostsAlias, FlumeConstants.DELEMETER_WHITE_SPACE));

        return props;
    }

    public LoadBalancingFlumeClient(String hostList) {
        this(DEFAULT_CLIENT_TYPE, hostList, DEFAULT_SELECTORDEFAULT_BACKOFFDEFAULT_MAX_BACKOFF);
    }

    public LoadBalancingFlumeClient(String clientType, String hostList, String selector, boolean backOff, int maxBackOff) {
        this.clientType = clientType;
        this.hosts = StringUtils.split(hostList, FlumeConstants.DELEMETER_COMMA);
        this.selector = selector;
        this.backOff = backOff;
        this.maxBackOff = maxBackOff;

        this.client = RpcClientFactory.getInstance(getProperties());
    }

    public String getClientType() {
        return clientType;
    }

    public String[] getHosts() {
        return hosts;
    }

    public String getSelector() {
        return selector;
    }

    public boolean isBackOff() {
        return backOff;
    }

    public int getMaxBackOff() {
        return maxBackOff;
    }

    public RpcClient getClient() {
        return client;
    }

    public void send(String data) {
        this.send(data, "UTF-8"null);
    }

    public void send(String data, Map<String, String> header) {
        this.send(data, "UTF-8", header);
    }

    private void send(String data, String encoding, Map<String, String> header) {
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"), header);

        try {
            getClient().append(event);
        } catch (EventDeliveryException e) {
            reconnect();
        }
    }


    public void clear() {
        this.client.close();
    }

    private void reconnect() {
        this.client.close();
        this.client null;
        this.client = RpcClientFactory.getInstance(getProperties());
    }
}
public class LoadBalancingFlumeClientTest {
    private static final String FLUME_HOSTS "10.101.57.130:9999,10.101.56.237:9999";
    private LoadBalancingFlumeClient client;

    @Before
    public void setUp() {
        client new LoadBalancingFlumeClient(FLUME_HOSTS);
    }

    @Test
    public void Flume_Avro_DEFAULT_TYPE() {
        Map<String, String> header = new HashMap<String, String>();
        for (int index = 0; index < 10; index++) {
            client.send("Default_Type......................................" + index, header);
        }
    }

    @Test
    public void Flume_Avro_MONGO_TYPE() {
        Map<String, String> header = new HashMap<String, String>();
        header.put("type""MONGO");
        for (int index = 0; index < 10; index++) {
            client.send("MONGO_Type......................................" + index, header);
        }
    }

    @Test
    public void Flume_Avro_FILE_TYPE() {
        Map<String, String> header = new HashMap<String, String>();
        header.put("type""FILE");
        for (int index = 0; index < 10; index++) {
            client.send("FILE_Type......................................" + index, header);
        }
    }
}

코드를 살펴보면 단일 호스트와의 차이점은 Host 설정시 방법의 차이일 뿐이다.
자세한 설명은 생략한다.

TC 를 실행해 보면 정상적으로 LoadBalancing 되어 전송됨을 확인할 수 있다.
LoadBalancing 방식은 random 과 round_robin 방식을 지원하며, 기본값은 random 이다.

이로써 적절한 Client 를 선택해 구현하면 Flume 전송을 효율적으로 할 수 있다.
Header 데이터를 통해서 좀 더 유연한 데이터 수집 플랫폼도 만들어 볼 수 있을 것 같다.


참고사항