Flume Custom Sink로 Mongo Sink를 한 번 만들어보자.
1. 단일 건별 Sink 처리
public class MongoSingleSink extends AbstractSink implements Configurable { private static final Log LOG = LogFactory.getLog(MongoSingleSink.class); private String host; private int port; private String dbName; private Mongo mongo; private DB db; @Override public synchronized void start() { try { mongo = new MongoClient(host, port); db = mongo.getDB(dbName); } catch (UnknownHostException e) { LOG.info("MongoDB connection error", e); } } public void configure(Context context) { host = context.getString("host"); port = context.getInteger("port"); dbName = context.getString("db"); } public Status process() throws EventDeliveryException { Status status = Status.READY; try { status = parseEvents(); } catch (Exception e) { LOG.info("mongo sink error", e); } return status; } private Status parseEvents() { Status status = Status.READY; Channel channel = getChannel(); Transaction tx = null; try { tx = channel.getTransaction(); tx.begin(); Event event = channel.take(); if (event == null) { status = Status.BACKOFF; } else { saveEvents(addEventToList(event)); } tx.commit(); } catch (Exception e) { LOG.error("can't process events, drop it!", e); if (tx != null) { tx.commit(); } } finally { if (tx != null) { tx.close(); } } return status; } private Map<String, DBObject> addEventToList(Event event) { Map<String, DBObject> docMap = new HashMap<String, DBObject>(); byte[] body = event.getBody(); DBObject eventJson = null; try { String bodyStr = new String(body); if (LOG.isDebugEnabled()) { LOG.debug("Mongo Sink : " + bodyStr); } int splitIndex = StringUtils.indexOf(bodyStr, ","); if (splitIndex < 0) { throw new Exception("invalid body format"); } else { eventJson = (DBObject) JSON.parse(StringUtils.substring(bodyStr, splitIndex + 1)); docMap.put(StringUtils.substring(bodyStr, 0, splitIndex), eventJson); } } catch (Exception e) { LOG.error("Can't parse events: " + new String(body), e); return docMap; } return docMap; } private void saveEvents(Map<String, DBObject> document) { if (MapUtils.isEmpty(document)) { LOG.info("documents is empty"); return; } for (String key : document.keySet()) { CommandResult result = db.getCollection(key).insert((DBObject) MapUtils.getObject(document, key), WriteConcern.NORMAL).getLastError(); if (result.ok()) { String errorMessage = result.getErrorMessage(); if (errorMessage != null) { LOG.error("can't insert documents with error: {} " + errorMessage); LOG.error("with exception", result.getException()); } } else { LOG.error("can't get last error"); } } } } |
2. 묶음 처리 Sink
public class MongoBulkSink extends AbstractSink implements Configurable { private static final Log LOG = LogFactory.getLog(MongoBulkSink.class); private String host; private int port; private String dbName; private int batchSize; private Mongo mongo; private DB db; @Override public synchronized void start() { try { mongo = new MongoClient(host, port); db = mongo.getDB(dbName); } catch (UnknownHostException e) { LOG.info("MongoDB connection error", e); } } public void configure(Context context) { host = context.getString("host"); port = context.getInteger("port"); dbName = context.getString("db"); batchSize = context.getInteger("size"); } public Status process() throws EventDeliveryException { Status status = Status.READY; try { status = parseEvents(); } catch (Exception e) { LOG.info("mongo sink error", e); } return status; } private Status parseEvents() { Status status = Status.READY; Channel channel = getChannel(); Transaction tx = null; try { tx = channel.getTransaction(); tx.begin(); status = processEvents(status, channel); tx.commit(); } catch (Exception e) { LOG.error("can't process events, drop it!", e); if (tx != null) { tx.commit(); } } finally { if (tx != null) { tx.close(); } } return status; } private Status processEvents(Status status, Channel channel) { Map<String, List<DBObject>> inputMap = new HashMap<String, List<DBObject>>(); for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) { Event event = channel.take(); if (event == null) { status = Status.BACKOFF; break; } inputMap = addEventToList(inputMap, event); } saveEvents(inputMap); return status; } /** * Collection별로 이벤트 구분 * @param inputMap * @param event * @return */ private Map<String, List<DBObject>> addEventToList(Map<String, List<DBObject>> inputMap, Event event) { byte[] body = event.getBody(); try { String bodyStr = new String(body); if (LOG.isDebugEnabled()) { LOG.debug("Mongo Sink : " + bodyStr); } int splitIndex = StringUtils.indexOf(bodyStr, ","); if (splitIndex < 0) { throw new Exception("invalid body format"); } else { String collectionName = StringUtils.substring(bodyStr, 0, splitIndex); List<DBObject> tempList = getInputDataList(inputMap, collectionName); tempList.add((DBObject) JSON.parse(StringUtils.substring(bodyStr, splitIndex + 1))); inputMap.put(collectionName, tempList); } } catch (Exception e) { LOG.error("Can't parse events: " + new String(body), e); return inputMap; } return inputMap; } @SuppressWarnings("unchecked") private List<DBObject> getInputDataList(Map<String, List<DBObject>> inputMap, String collectionName) { if (inputMap.containsKey(collectionName)) { return (List<DBObject>) MapUtils.getObject(inputMap, collectionName); } else { return new ArrayList<DBObject>(); } } /** * 몽고DB 저장 * @param documents */ @SuppressWarnings("unchecked") private void saveEvents(Map<String, List<DBObject>> documents) { if (MapUtils.isEmpty(documents)) { LOG.info("documents is empty"); return; } for (String key : documents.keySet()) { CommandResult result = db.getCollection(key).insert((List<DBObject>) MapUtils.getObject(documents, key), WriteConcern.NORMAL).getLastError(); checkResult(result); } } /** * MongoDB 저장 결과 확인 * @param result */ private void checkResult(CommandResult result) { if (result.ok()) { String errorMessage = result.getErrorMessage(); if (errorMessage != null) { LOG.error("can't insert documents with error: {} "+ errorMessage); LOG.error("with exception", result.getException()); } } else { LOG.error("can't get last error"); } } } |
maven build 시 jar 내에 dependency를 모두 포함해야 하므로 pom.xml 에 아래와 같이 설정이 필요하다.
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.12.3</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <verbose>true</verbose> <fork>true</fork> <compilerVersion>1.6</compilerVersion> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <configuration> <finalName>${jar.name}</finalName> <outputDirectory>${deploy.target.dir}</outputDirectory> </configuration> <executions> <execution> <id>make-a-jar</id> <phase>compile</phase> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.2.1</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> |
* maven-jar-plugin : jar로 패키징 하기 위한 maven plugin
* maven-assembly-plugin : jar 내에 dependency를 포함하기 위한 maven plugin
build 시 goal은 아래와 같이 준다.
clean compile dependency:copy-dependencies assembly:assembly |
'BigData > Flume' 카테고리의 다른 글
Flume(TCP/AVRO source + Memory channel + logger sink) - Log4j로 연동하기 (0) | 2016.01.20 |
---|---|
Flume RpcClient 사용기 (0) | 2016.01.20 |
flume multi sink 설정하기 (0) | 2015.04.14 |