본문 바로가기
BigData/Flume

Custom Sink 만들어보기

by Tomining 2015. 5. 12.
Flume Custom Sink로 Mongo Sink를 한 번 만들어보자.

1. 단일 건별 Sink 처리
public class MongoSingleSink extends AbstractSink implements Configurable {
     private static final Log LOG = LogFactory.getLog(MongoSingleSink.class);

     private String host;
     private int port;
     private String dbName;
     private Mongo mongo;
     private DB db;

     @Override
     public synchronized void start() {
          try {
               mongo = new MongoClient(host, port);
               db = mongo.getDB(dbName);
          } catch (UnknownHostException e) {
               LOG.info("MongoDB connection error", e);
          }
     }

     public void configure(Context context) {
          host = context.getString("host");
          port = context.getInteger("port");
          dbName = context.getString("db");
     }

     public Status process() throws EventDeliveryException {
          Status status = Status.READY;
          try {
               status = parseEvents();
          } catch (Exception e) {
               LOG.info("mongo sink error", e);
          }

          return status;

     }

     private Status parseEvents() {
          Status status = Status.READY;
          Channel channel = getChannel();
          Transaction tx = null;

          try {
               tx = channel.getTransaction();
               tx.begin();

               Event event = channel.take();
               if (event == null) {
                    status = Status.BACKOFF;
               } else {
                    saveEvents(addEventToList(event));
               }

               tx.commit();
          } catch (Exception e) {
               LOG.error("can't process events, drop it!", e);
               if (tx != null) {
                    tx.commit();
               }
          } finally {
               if (tx != null) {
                    tx.close();
               }
          }

          return status;
     }

     private Map<String, DBObject> addEventToList(Event event) {
          Map<String, DBObject> docMap = new HashMap<String, DBObject>();
          byte[] body = event.getBody();
          DBObject eventJson = null;

          try {
               String bodyStr = new String(body);
               if (LOG.isDebugEnabled()) {
                    LOG.debug("Mongo Sink : " + bodyStr);
               }
              
               int splitIndex = StringUtils.indexOf(bodyStr, ",");
               if (splitIndex < 0) {
                    throw new Exception("invalid body format");
               } else {
                    eventJson = (DBObject) JSON.parse(StringUtils.substring(bodyStr, splitIndex + 1));
                    docMap.put(StringUtils.substring(bodyStr, 0, splitIndex), eventJson);
               }
          } catch (Exception e) {
               LOG.error("Can't parse events: " + new String(body), e);
               return docMap;
          }

          return docMap;
     }

     private void saveEvents(Map<String, DBObject> document) {
          if (MapUtils.isEmpty(document)) {
               LOG.info("documents is empty");
               return;
          }

          for (String key : document.keySet()) {
               CommandResult result = db.getCollection(key).insert((DBObject) MapUtils.getObject(document, key), WriteConcern.NORMAL).getLastError();
               if (result.ok()) {
                    String errorMessage = result.getErrorMessage();
                    if (errorMessage != null) {
                         LOG.error("can't insert documents with error: {} " + errorMessage);
                         LOG.error("with exception", result.getException());
                    }
               } else {
                    LOG.error("can't get last error");
               }
          }
     }
}

2. 묶음 처리 Sink
public class MongoBulkSink extends AbstractSink implements Configurable {
     private static final Log LOG = LogFactory.getLog(MongoBulkSink.class);
    
     private String host;
     private int port;
     private String dbName;
     private int batchSize;
     private Mongo mongo;
     private DB db;
    
     @Override
     public synchronized void start() {
          try {
               mongo = new MongoClient(host, port);
               db = mongo.getDB(dbName);
          } catch (UnknownHostException e) {
               LOG.info("MongoDB connection error", e);
          }
     }
    
     public void configure(Context context) {
          host = context.getString("host");
          port = context.getInteger("port");
          dbName = context.getString("db");
          batchSize = context.getInteger("size");
     }

     public Status process() throws EventDeliveryException {
        Status status = Status.READY;
        try {
            status = parseEvents();
        } catch (Exception e) {
             LOG.info("mongo sink error", e);
        }
       
        return status;

     }

     private Status parseEvents() {
          Status status = Status.READY;
        Channel channel = getChannel();
        Transaction tx = null;
       
        try {
            tx = channel.getTransaction();
            tx.begin();

            status = processEvents(status, channel);

            tx.commit();
        } catch (Exception e) {
             LOG.error("can't process events, drop it!", e);
            if (tx != null) {
                tx.commit();
            }
        } finally {
            if (tx != null) {
                tx.close();
            }
        }
       
        return status;
     }

     private Status processEvents(Status status, Channel channel) {
          Map<String, List<DBObject>> inputMap = new HashMap<String, List<DBObject>>();
         
          for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
               Event event = channel.take();
               if (event == null) {
                    status = Status.BACKOFF;
                    break;
               }
              
               inputMap = addEventToList(inputMap, event);
          }
         
          saveEvents(inputMap);
         
          return status;
     }
    
     /**
     * Collection별로 이벤트 구분
     * @param inputMap
     * @param event
     * @return
     */
     private Map<String, List<DBObject>> addEventToList(Map<String, List<DBObject>> inputMap, Event event) {
        byte[] body = event.getBody();
       
        try {
             String bodyStr = new String(body);
             if (LOG.isDebugEnabled()) {
                    LOG.debug("Mongo Sink : " + bodyStr);
               }
            
             int splitIndex = StringUtils.indexOf(bodyStr, ",");
             if (splitIndex < 0) {
                  throw new Exception("invalid body format");
               } else {
                    String collectionName = StringUtils.substring(bodyStr, 0, splitIndex);
                    List<DBObject> tempList = getInputDataList(inputMap, collectionName);
                    tempList.add((DBObject) JSON.parse(StringUtils.substring(bodyStr, splitIndex + 1)));
                    inputMap.put(collectionName, tempList);
               }
        } catch (Exception e) {
            LOG.error("Can't parse events: " + new String(body), e);
            return inputMap;
        }

        return inputMap;
    }

     @SuppressWarnings("unchecked")
     private List<DBObject> getInputDataList(Map<String, List<DBObject>> inputMap, String collectionName) {
          if (inputMap.containsKey(collectionName)) {
               return (List<DBObject>) MapUtils.getObject(inputMap, collectionName);
          } else {
               return new ArrayList<DBObject>();
          }
     }

     /**
     * 몽고DB 저장
     * @param documents
     */
     @SuppressWarnings("unchecked")
     private void saveEvents(Map<String, List<DBObject>> documents) {
          if (MapUtils.isEmpty(documents)) {
            LOG.info("documents is empty");
            return;
        }

          for (String key : documents.keySet()) {
               CommandResult result = db.getCollection(key).insert((List<DBObject>) MapUtils.getObject(documents, key), WriteConcern.NORMAL).getLastError();
               checkResult(result);
          }
    }

     /**
     * MongoDB 저장 결과 확인
     * @param result
     */
     private void checkResult(CommandResult result) {
          if (result.ok()) {
               String errorMessage = result.getErrorMessage();
               if (errorMessage != null) {
                    LOG.error("can't insert documents with error: {} "+ errorMessage);
                    LOG.error("with exception", result.getException());
               }
          } else {
               LOG.error("can't get last error");
          }
     }
}


maven build 시 jar 내에 dependency를 모두 포함해야 하므로 pom.xml 에 아래와 같이 설정이 필요하다.
<build>
     <plugins>
          <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-surefire-plugin</artifactId>
               <version>2.12.3</version>
          </plugin>
          <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-compiler-plugin</artifactId>
               <version>3.0</version>
               <configuration>
                    <verbose>true</verbose>
                    <fork>true</fork>
                    <compilerVersion>1.6</compilerVersion>
               </configuration>
          </plugin>
          <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-jar-plugin</artifactId>
               <version>2.4</version>
               <configuration>
                    <finalName>${jar.name}</finalName>
                    <outputDirectory>${deploy.target.dir}</outputDirectory>
               </configuration>
               <executions>
                    <execution>
                         <id>make-a-jar</id>
                         <phase>compile</phase>
                         <goals>
                              <goal>jar</goal>
                         </goals>
                    </execution>
               </executions>
          </plugin>
          <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-assembly-plugin</artifactId>
               <version>2.2.1</version>
               <configuration>
                    <descriptorRefs>
                         <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
               </configuration>
          </plugin>
     </plugins>
</build>

* maven-jar-plugin : jar로 패키징 하기 위한 maven plugin
* maven-assembly-plugin : jar 내에 dependency를 포함하기 위한 maven plugin

build 시 goal은 아래와 같이 준다.
clean compile dependency:copy-dependencies assembly:assembly