- Maven Project 생성
- pom.xml 설정
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jw.storm</groupId>
<artifactId>hellostorm</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hellostorm</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass />
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> - 구조
Spout에서 Bolt로 "hello storm"이라는 메시지를 전송한다.
실제 클러스터 환경에서는 spout은 외부로부터 데이터를 받아서 가공 후 bolt로 전송해 주는 식으로 적용될 것이다. 여기서 spout과 bolt를 연결해 주는 Topology가 있다.
구조 자체는 Flume과 유사하다. Source에서 Sink로 데이터를 보내고, 이를 연결해 주는 Channel이 존재한다.(Flume)
Flume과 Storm의 역할은 대량의 데이터를 수집하는데 있으므로, 큰 틀에서는 유사하다고 할 수 있다.(근데... 무슨 차이가 있을까?)
- HelloSpout
public class HelloSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private SpoutOutputCollector spoutOutputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("say"));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
this.spoutOutputCollector.emit(new Values("hello storm"));
}
}
nextTuple()이라는 함수는 외부에서 데이터를 받아들여, 다음 flow로 보내는 역할을 한다.
예제에서는 say라는 필드에 "hello storm"이라는 값을 전달하고, 이 때 emit()함수를 호출한다.
- HelloBolt
public class HelloBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String value = tuple.getStringByField("say");
System.out.println("Tuple value is " + value);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
데이터가 전달이 되면, execute() 함수가 호출이 되는데, tuple에서 데이터를 꺼내 System.out.println()으로 콘솔에 출력하였다.
아마도 실제 클러스터 환경에서는 HDFS에 파일을 작성하거나, MongoDB에 데이터를 넣어두도록 구현도 가능할 것이다.(flume처럼 말이다.)
- HelloTopology
public class HelloTopology {
public static void main(String args[]){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("HelloSpout", new HelloSpout(), 2);
builder.setBolt("HelloBolt", new HelloBolt(), 4).shuffleGrouping("HelloSpout");
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("HelloTopology", conf,builder.createTopology());
Utils.sleep(10000);
// kill the LearningStormTopology
cluster.killTopology("HelloTopology");
// shutdown the storm test cluster
cluster.shutdown();
}
}
TopologyBuilder를 통해 spout과 bolt를 연결하고, 테스트 클러스터 환경인 LocalCluster를 통해서 Topology를 수행시킨다.
Config를 통해서 어떤 서버/port에서 수행시킬지 설정할 수 있으나, 예제에서는 생략한다.
Topology는 cluster.submitTopology()를 하게 되면 자동으로 수행이 되며, HelloSpout은 "hello storm" 값을 HelloBolt로 계속 전송하게 된다.
- 실행 결과
위 샘플코드를 수행한 결과(수행 로그1)13645 [Thread-14-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13645 [Thread-16-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13645 [main] INFO backtype.storm.daemon.executor - Shutting down executor HelloSpout:[5 5]
13645 [Thread-14-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13645 [Thread-22-HelloBolt] INFO backtype.storm.daemon.executor - Processing received message source: HelloSpout:5, stream: default, id: {}, [hello storm]
Tuple value is hello storm
13645 [Thread-16-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13645 [Thread-22-HelloBolt] INFO backtype.storm.daemon.executor - Processing received message source: HelloSpout:6, stream: default, id: {}, [hello storm]
Tuple value is hello storm
13645 [Thread-16-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13645 [Thread-14-HelloSpout] INFO backtype.storm.util - Async loop interrupted!
Spout에서 여러 번 전송하였지만, Bolt에서 받은 값은 몇 개 안 되는 것을 확인할 수 있다. 붉은색 로그가 간간히 출력되면서 로그가 이상하게 찍히는데 왜 일까?
구글링을 하다가 문득 Spout Thread가 Bolt Thread가 다 처리되기 전에 종료된다면 전송되지 못하지 않을까? 라는 생각이 들었다.
그래서 아래와 같이 sleep을 주어봤다.public class HelloSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private SpoutOutputCollector spoutOutputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("say"));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
this.spoutOutputCollector.emit(new Values("hello storm"));
Utils.sleep(100);
}
}
그 결과...13227 [Thread-22-HelloBolt] INFO backtype.storm.daemon.executor - Processing received message source: HelloSpout:6, stream: default, id: {}, [hello storm]
Tuple value is hello storm
13323 [Thread-14-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13323 [Thread-22-HelloBolt] INFO backtype.storm.daemon.executor - Processing received message source: HelloSpout:5, stream: default, id: {}, [hello storm]
Tuple value is hello storm
13327 [Thread-16-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13327 [Thread-8-HelloBolt] INFO backtype.storm.daemon.executor - Processing received message source: HelloSpout:6, stream: default, id: {}, [hello storm]
Tuple value is hello storm
13423 [Thread-14-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13423 [Thread-12-HelloBolt] INFO backtype.storm.daemon.executor - Processing received message source: HelloSpout:5, stream: default, id: {}, [hello storm]
Tuple value is hello storm
13427 [Thread-16-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13427 [Thread-10-HelloBolt] INFO backtype.storm.daemon.executor - Processing received message source: HelloSpout:6, stream: default, id: {}, [hello storm]
Tuple value is hello storm
13523 [Thread-14-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13523 [Thread-10-HelloBolt] INFO backtype.storm.daemon.executor - Processing received message source: HelloSpout:5, stream: default, id: {}, [hello storm]
Tuple value is hello storm
13527 [Thread-16-HelloSpout] INFO backtype.storm.daemon.task - Emitting: HelloSpout default [hello storm]
13527 [Thread-22-HelloBolt] INFO backtype.storm.daemon.executor - Processing received message source: HelloSpout:6, stream: default, id: {}, [hello storm]
Tuple value is hello storm
전송한 만큼 Bolt에서 처리되어 콘솔에 출력되는 것을 확인할 수 있었다.
이런 결과라면 메시지 전송시 누락이 있는 것은 아닐까?
- 느낀 점.
Flume과 다르게 별도 설치 없이 maven dependency로 jar를 받아 로컬에서 수행해 볼 수 있었다. 아직 샘풀만 돌려봤지만. storm 도 log4j와 연동이 되어 로그 전송이 쉬울까? 라는 의문이 든다.
문서를 보니... Zookeeper가 내장되어 있는 거 같은데... Flume보다는 Load Balancing이 쉬울 거 같다는 생각이 든다.
아직 Flume을 Storm으로 변경해야겠다는 매력적인 요소가 보이지 않는다.
BigData/Storm