티스토리 뷰

RDD 개념을 갖고 있는 Spark 와 유사하게 Spark Streaming 은 DStream 또는 Discretized Streams 라고 불리는 추상개념을 갖고 있다.
DStream 이란? 시간 흐름에 따른 순차적 데이터를 의미한다. 내부적으로 DStream 은 각 시점에 RDD 시퀀스이다. DStream 은 Flume, Kafka 또는 HDFS 같은 많은 Input 유형으로 부터 생성될 수 있다.

DStream 에는 두 가지 유형의 Operation 이 제공된다.

  • Transformation
    DStream 을 생성(파생)
  • Output
    외부 데이터 저장소에 결과 저장

기본적으로 RDD 에 제공되는 Operation 과 시간 기반의 Operation(예를 들면 sliding window) 을 제공한다.

Batch 프로그램과 다르게 Spark Streaming 은 24/7 을 수행하기 위해서 몇 가지 추가 설정이 필요하다.
HDFS 같은 저장소에 데이터를 저장하기 위해  Spark Streaming 이 제공하는 주요 메커니즘 인 checkpointing 과 Application 오류가 발생했을 경우 어떻게 재시작 하는지, 자동으로 재시작 할 수 있게 하기 위해 어떻게 설정하는지에 대해 알아보자.

A Simple Example

간단한 예제를 살펴보자.

시스템 Log Stream 을 받아서 error 를 포함하고 있는 line 만 출력해보자.

  1. StreamingContext 생성
  2. batch interval(1초) 설정(신규 데이터가 어떤 주기로 생성되는지)
  3. socketTextStream() 사용하여 DStream 생성
  4. “error” 를 포함한 라인만 선별(filter)
  5. prrint()

// Create a StreamingContext with a 1-second batch size from a SparkConf
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create a DStream from all the input on port 7777
JavaDStream<String> lines = jssc.socketTextStream("localhost", 7777);
// Filter our DStream for lines with "error"
JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() {
  public Boolean call(String line) {
    return line.contains("error");
  }});
// Print out the lines with errors
errorLines.print();

위 코드는 데이터를 받았을 때, 어떤 동작을 할 지 정의한 것이다. 데이터를 받기 시작하기 위해서는 StreamingContext.start() 함수를 호출해 주어야 한다. 그런 다음 Spark Streaming 이 Spark Job 을 시작하는데, 이는 각각의 Thread 로 동작한다. 따라서 Application 이 종료되지 않고 연산을 마칠 때까지 유지하기 위해서 SparkContext.awaitTermination() 을 호출해 주어야 한다.

// Start our streaming context and wait for it to "finish"
jssc.start();
// Wait for the job to finish
jssc.awaitTermination();

StreamingContext 는 한 번만 시작될 수 있으며, DStream 이나 Output 설정이 완료된 뒤 시작되어야 한다.

위에서 만든 예제를 돌려보자.

spark-submit --class com.oreilly.learningsparkexamples.scala.StreamingLogInput \
$ASSEMBLY_JAR local[4]

nc localhost 7777   # Lets you type input lines to send to the server
<your input here>

* 위도우 사용자는 nc 대신에 ncat 을 사용해야 한다.

Architecture and Abstraction

 Spark Streaming 은 “micro-batch” 구조를 사용한다. 이는 작은 데이터 묶음을 연속된 Batch Series 처럼 데이터를 처리한다.

정해진 주기마다 새로운 Batch 가 생성되는데, 각 주기 내에서

  • 시작부분
    새로운 Batch 를 생성하고, 들어온 데이터를 Batch 에 추가한다.
  • 중간부분
    Batch Process
  • 끝부분
    Batch 진행이 완료된다.

Batch 주기는 보통 500 micro second 에서 수 초 정도로 설정한다. 

Spark Streaming 구조는 아래와 같다.


Spark Streaming 프로그래밍은 Stream 내에 일부 데이터로 구성된 RDD Sequence 인 Discretized Stream 또는 DStream 를 처리하는 것이다.



DStream 은 외부 Input Source 로 부터 생성하거나, 기존 DStream 을 변환(Transformation) 하여 생성할 수 있다. 앞에서도 설명했듯이, 기존 RDD 에서 지원하는 Transformation Operation 물론이고, DStream 은 시간 범위 내에 데이터를 집계할 수 있는 “Stateful Transformation” 도 제공한다.

위 Simple Example 에서는 Socket 을 통해서 받은 데이터로부터 DStream 을 생성하고 Filter 함수를 적용하였다.



예제를 실행시켜보면 아래와 같은 결과를 볼 수 있다.

-------------------------------------------
Time: 1413833674000 ms
-------------------------------------------
71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /error78978 HTTP/1.1" 404 505
...

-------------------------------------------
Time: 1413833675000 ms
-------------------------------------------
71.19.164.174 - - [24/Sep/2014:22:27:10 +0000] "GET /error78978 HTTP/1.1" 404 505
...

“micro-batch” 구조의 Spark Streaming 을 잘 표현해 주고 있는 결과 로그이다. Batch 주기(샘플에서는 1초) 별로 해당 로그가 출력된다. 이렇듯 Spark Streaming 은 많은 작은 Job 들로 구성이 되는데, 이를 Spark UI 에서 확인할 수 있다.



Transformation Operation 외에도 DStream 은 print() 같은 Output Operation 도 제공한다. 이는 RDD 의 Action Operation 과 유사한데, 각 Batch 에서 주기적으로 실행이 되며, Batch 결과로 처리된다.

Driver-Worker 구조에서 Spark Streaming 수행은 아래와 같이 일이난다. 각 Input 데이터를 받기 위해 receiver 를 실행한다. 이 receiver 는 Executor 와 함께 수행되는데, Input Source 로부터 데이터를 수집하고, 이를 RDD 로 저장하는 역할을 한다. Fault Tolerance(유사시) 를 위해 데이터를 받아서 다른 Executor 에 복사를 해 둔다. 이 데이터는 Cached RDD 처럼 Executor 에 저장된다. Driver Program 의 StreamingContext 는 데이터를 처리하고 이전 Time 의 RDD 와 합치기 위해 주기적으로 Spark Job 을 수행한다.


위에서 잠깐 언급했듯이, Spark Streaming 은 Fault Tolerance 를 위해 Input Source 를 복제를 해 둔다. 그래서 특정 Worker 노드가 실패하더라도 복제된 데이터로 복구가 가능하다. 그러나 Lineage(계보) 를 이용해서 Application 이 실행된 이후 생성된 데이터를 다시 계산하는 것은 상당한 시간이 소요된다. 그래서 Spark Streaming 은 주기적으로 파일시스템(HDFS 나 S3) 에 저장할 수 있는 checkpointing 이라는 mechanism 을 이용한다. 보통 5~10 Batch 데이터를 저장하도록 설정한다.

Transformations

DStream 의 Transformation Operation 은 두가지로 구분될 수 있다.

  • Stateless
    이전 Batch 결과 데이터에 영향을 받지 않는 데이터를 처리
    map(), filter(), reduceByKey() 같은 Operation 들
  • Stateful
    이전 Batch 결과 데이터를 현재 Batch 결과를 만들기 위해 사용
    Sliding Window 나 기간 내 상태 추적 등이 있다.

Stateless Transformations

Function name Purpose Scala example Signature of user-supplied function on DStream[T]

map()

Apply a function to each element in the DStream and return a DStream of the result.

ds.map(x => x + 1)

f: (T) → U

flatMap()

Apply a function to each element in the DStream and return a DStream of the contents of the iterators returned.

ds.flatMap(x => x.split(" "))

f: T → Iterable[U]

filter()

Return a DStream consisting of only elements that pass the condition passed to filter.

ds.filter(x => x != 1)

f: T → Boolean

repartition()

Change the number of partitions of the DStream.

ds.repartition(10)

N/A

reduceByKey()

Combine values with the same key in each batch.

ds.reduceByKey((x, y) => x + y)

f: T, T → T

groupByKey()

Group values with the same key in each batch.

ds.groupByKey()

N/A


위 Operation 들이 전체 Stream 에 적용되는 것처럼 보이지만, 여러 RDD 로 구성된 각 DStream 내부적으로는 각 RDD 별로 구분되어 적용된다. 예를 들면 reduceByKey() 는 전체 기간에 대해서 적용되는 것이 아니라 각 Batch 별로 reduce 를 실행한다.

stateless transformation 은 각 단계별로 여러 개의 DStream 을 병합할 수 있다. 예를 들면 Key/Value DStream 는 RDD 처럼 Join 관련 Transformation 을 지원한다.
cogroup(), join(), leftouterjoin() 등등

DStream 은 RDD 내부에서 직접 동작할 수 있도록 하기 위해 transform() 이라는 좀 더 높은 레벨의 Operator 를 지원한다. transform() 은 DStream 에 적용하기 위한 RDD 사이에 모든 함수를 제공한다.  이 함수는 새로운 Stream 을 만들어 내기 위해 각 Data Batch 내에서 수행할 수 있다.
보통 transform() 함수는 RDD 에 적용한 처리를 재사용하기 위해 사용한다. 예를 들어 extractOutliers() 를 사용했다면, transform() 을 통해 재사용 할 수 있다는 것이다.(예제 참조)

val outlierDStream = accessLogsDStream.transform { rdd =>
  extractOutliers(rdd)
}
 
JavaPairDStream<String, Long> ipRawDStream = accessLogsDStream.transform(
  new Function<JavaRDD<ApacheAccessLog>, JavaRDD<ApacheAccessLog>>() {
    public JavaPairRDD<ApacheAccessLog> call(JavaRDD<ApacheAccessLog> rdd) {
      return extractOutliers(rdd);
    }
});

여러 개의 DStream 으로부터 데이터를 병합(combine) 또는 변환(transform) 한다면, StreamingContext.transform 또는 DStream.transformWith(otherStream, func) 을 사용할 수 있다.

Stateful Transformations

Stateful Transformation 은 Fault Tolerance 를 위해 checkpointing 을 활성화 해야 한다.

ssc.checkpoint("hdfs://...")

로컬에서 개발을 진행한다면, hdfs 가 아니라 local 경로를 사용해도 된다.

Windowed Transformcation

windowed operation 은 Batch 주기보다 긴 사간의 Streaming 결과를 처리할 수 있다.

windowed operation 은 두 개의 파라메터를 필요로 하는데, 이는 다음과 같다.

  • window duration
  • sliding duration



JavaDStream<ApacheAccessLog> accessLogsWindow = accessLogsDStream.window(
    Durations.seconds(30), Durations.seconds(10));
JavaDStream<Integer> windowCounts = accessLogsWindow.count();

window() 함수를 통해 windowed operation 을 수행할 수 있지만,  Spark Streaming 이 제공하는 몇 가지 효율적이고 편한 함수들이 있다.
먼저, reduceByWindow() 나 reduceByKeyAndWindow() 는 각 Window 내에서 좀 더 효율적으로 동작한다. 이런 함수들은 + 처럼 전체 window 에 대해서 하나의 reduce 함수를 갖는다. 그리고 reduce 작업을 점진적으로 수행 가능하도록 특정 형태를 갖는데, 이는 reduce 의 역함수인 -(+와 반대) 도 있다. 이렇게 역함수를 갖고 있으면 좀 더 유용하다.


위 그림처럼 window() 함수를 활용하면 좀 더 효율적으로 operation 을 수행할 수 있다.

class ExtractIp extends PairFunction<ApacheAccessLog, String, Long> {
  public Tuple2<String, Long> call(ApacheAccessLog entry) {
    return new Tuple2(entry.getIpAddress(), 1L);
  }
}
class AddLongs extends Function2<Long, Long, Long>() {
  public Long call(Long v1, Long v2) { return v1 + v2; }
}
class SubtractLongs extends Function2<Long, Long, Long>() {
  public Long call(Long v1, Long v2) { return v1 - v2; }
}

JavaPairDStream<String, Long> ipAddressPairDStream = accessLogsDStream.mapToPair(
  new ExtractIp());
JavaPairDStream<String, Long> ipCountDStream = ipAddressPairDStream.
  reduceByKeyAndWindow(
  new AddLongs(),      // Adding elements in the new batches entering the window
  new SubtractLongs()
  // Removing elements from the oldest batches exiting the window
  Durations.seconds(30),  // Window duration
  Durations.seconds(10)); // Slide duration

Data Counting 을 할 때, DStream 에서는 countByWindow()와 countByValueAndWindow() 를 제공한다.

  • countByWindow - 각 window 내에 Element 들의 갯수(Unique Element)
  • countByValueAndWindow - 각 window 내에 value 갯수(Unique Value)

JavaDStream<String> ip = accessLogsDStream.map(
  new Function<ApacheAccessLog, String>() {
    public String call(ApacheAccessLog entry) {
      return entry.getIpAddress();
    }});
JavaDStream<Long> requestCount = accessLogsDStream.countByWindow(
  Dirations.seconds(30), Durations.seconds(10));
JavaPairDStream<String, Long> ipAddressRequestCount = ip.countByValueAndWindow(
  Dirations.seconds(30), Durations.seconds(10));


UpdateStateByKey Transformations

때로는 DStream 내 여러 Batch 들을 하나의 state 로 관리할 필요가 있을 때 유용하다. updateStateByKey() 함수는 Key/Value 형태의 DStream 에서 상태변수를 갱신할 수 있게 한다. 
예를 들어 웹서버 로그에서 UserId 가 Key 인 특정 사이트 접속 이벤트가 있다고 가정하자. UpdateStateByKey() 를 사용하면, 각 사용자가 접근한 최근 10 페이지를 추적할 수 있다.  이 페이지 목록은 상태(State) 정보이고, 각 Event 가 들어올 때마다 갱신된다.

updateStateByKey() 를 사용하기 위해 update(events, oldState) => newState 를 제공하는데, 각 항목은 아래와 같다.

  • events - 현재 Batch 에서 들어오는 Event 목록
  • oldState - Optional 한 상태 정보이며, Option 형태로 저장된다. 만약 값이 없다면 해당 Key 에 대한 이전상태가 없음을 의미한다.
  • newState - 함수의 반환 값이며, Option 형태이다. 상태를 삭제하고자 할 때, 빈 Option 을 반환할 수도 있다.

updateStateByKey() 의 반환값은 RDD(key, state) pair 를 포함하는 DStream 이다.

아래 예제는 로그 메시지에서 HTTP 응답 코드별 count 값을 계산하는 예제이다.

class UpdateRunningSum implements Function2<List<Long>,
    Optional<Long>, Optional<Long>> {
  public Optional<Long> call(List<Long> nums, Optional<Long> current) {
    long sum = current.or(0L);
    return Optional.of(sum + nums.size());
  }
};

JavaPairDStream<Integer, Long> responseCodeCountDStream = accessLogsDStream.mapToPair(
    new PairFunction<ApacheAccessLog, Integer, Long>() {
      public Tuple2<Integer, Long> call(ApacheAccessLog log) {
        return new Tuple2(log.getResponseCode(), 1L);
    }})
  .updateStateByKey(new UpdateRunningSum());


Output Operation

Output Operation 이란 Stream 에서 최종 데이터에 대한 특정 처리를 지칭한다. (예를 들어 DB 에 데이터를 저장하거나, 스크린에 출력해주거나)
이전에도 언급된 print() 함수는 debug output operation 으로 보통 사용된다. 이는 DStream 의 각 Batch 마다 10개 항목을 출력해 준다.

Debugging 을 할 때, 결과를 저장하기 위해서 Output Operation 을 사용하기도 하는데, DStream 에도 save() 같은 함수가 있다. 

ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")

디렉토리와 suffix(Optional) 을 파라미터로 받는다.

Hadoop OutputFormat 을 사용하기 위해 saveAsHadoopFiles() 를 사용한다. 그러니 Spark Streaming 은 saveAsSequenceFiles() 를 지원하지 않지만, 아래와 같이 SequenceFiles 도 저장할 수 있다.

JavaPairDStream<Text, LongWritable> writableDStream = ipDStream.mapToPair(
  new PairFunction<Tuple2<String, Long>, Text, LongWritable>() {
    public Tuple2<Text, LongWritable> call(Tuple2<String, Long> e) {
      return new Tuple2(new Text(e._1()), new LongWritable(e._2()));
  }});
class OutFormat extends SequenceFileOutputFormat<Text, LongWritable> {};
writableDStream.saveAsHadoopFiles(
  "outputDir", "txt", Text.class, LongWritable.class, OutFormat.class);

마지막으로 froeachRDD() 도 일반적인 Output Operation 에 해당된다. transform() 처럼 각 RDD 를 접근할 수 있고, Spark 가 지원하는 모든 Action Operation 을 사용할 수 있다. 예를 들면 MySql 같은 DB 에 저장할 수 있다. Spark 는 saveAs() 같은 함수는 없지만, foreachPartition() 같은 함수로 내용을 저장할 수 있다.

ipAddressRequestCount.foreachRDD { rdd =>
  rdd.foreachPartition { partition =>
    // Open connection to storage system (e.g. a database connection)
    partition.foreach { item =>
      // Use connection to push item to system
    }
    // Close connection
  }
}


Input Sources

Spark Streaming 은 기본적으로 여러가지 데이터 Source 를 갖고 있다. 몇몇 source 들은 Maven Artifact 에 포함되어 있고, 나머지 source 들은 spark-streaming-kafka 처럼 별도의 Maven Artifact 로 제공된다.

Core Sources

  • socket
  • file
  • akka actors

Additional Source

  • Apache Kafka
    import org.apache.spark.streaming.kafka.*;
    ...
    // Create a map of topics to number of receiver threads to use
    Map<String, Integer> topics = new HashMap<String, Integer>();
    topics.put("pandas", 1);
    topics.put("logs", 1);
    JavaPairDStream<String, String> input =
      KafkaUtils.createStream(jssc, zkQuorum, group, topics);
    input.print();
  • Apache Flume

    • push-based receiver
      receiver 는 Apache Flume 이 데이터를 전달해주는 Sink 처럼 동작
      a1.sinks = avroSink
      a1.sinks.avroSink.type = avro
      a1.sinks.avroSink.channel = memoryChannel
      a1.sinks.avroSink.hostname = receiver-hostname
      a1.sinks.avroSink.port = port-used-for-avro-sink-not-spark-port
      
      JavaDStream<SparkFlumeEvent> events = FlumeUtils.createStream(ssc, receiverHostname,
                                            receiverPort)
      
    • poll-based receiverreceiver 는 Custom Sink 로부터 데이터를 가져 올 수 있음
      a1.sinks = spark
      a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
      a1.sinks.spark.hostname = receiver-hostname
      a1.sinks.spark.port = port-used-for-sync-not-spark-port
      a1.sinks.spark.channel = memoryChannel
      
      JavaDStream<SparkFlumeEvent> events = FlumeUtils.createPollingStream(ssc,
        receiverHostname, receiverPort)
      

Multiple Sources and Cluster Sizing

DStream 에 union() 을 이용하여 여러 Source 로 부터 받은 데이터를 합칠 수 있다.
간혹 Multiple Receiver 데이터 통합 성능을 위해 필요할 수 있다.(Single Receiver 를 사용하는 경우 병목현상이 발생할 수도 있다.)

24/7 Operation

Spark Streaming 에서 가장 큰 장점 중 하나는 Fault Tolerance(고장방지) 보장이다. input 데이터가 저장된 뒤 Spark Streaming 은  Worker 나 Driver 가 죽더라도 항상 정확한 결과를 내 놓는다.

24/7 을 수행하려면 좀 특별할 설정이 필요하다. 먼저 HDFS 나 S3 에 checkpointing 설정을 해야한다. 그리고 drvier 프로그램이나 이상한 input 데이터에 대한 별도 처리 코드가 필요하다.

Checkpointing

Checkpointing 은 Spark Streaming 에서 Fault Tolerance(고장방지) 를 위한 주요 메커니즘이나.  이는 데이터 복구를 위해서 주기적으로 HDFS 나 S3 같은 저장소에 데이터를 저장한다. Checkpointing 에는 두 가지 목적이 있는데, 아래와 같다.

  • 오류가 발생했을 때, 다시 RDD 를 계산하는 것을 방지한다.
  • Driver 에 대한 Fault Tolerance 를 보장한다.

이런 이유로 Checkpointing 을 설정하는 것은 아주 중요하다.

ssc.checkpoint("hdfs://...")

local 모드로 사용할 때에는 Checkpointing 을 위해 local 파일시스템 경로로 설정해도 된다. 그러나 production level 에서는 복제기능을 제공하는 HDFS 나 S3 를 사용해야 한다.

Driver Fault Tolerance

Drvier node 의 Fault Tolerance 를 위해서는 StreamingContext 를 좀 다른 방법으로 생성해야 한다. StreamingContext.getOrCreate() 를 사용하면 된다.

JavaStreamingContextFactory fact = new JavaStreamingContextFactory() {
  public JavaStreamingContext call() {
    ...
    JavaSparkContext sc = new JavaSparkContext(conf);
    // Create a StreamingContext with a 1 second batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
    jssc.checkpoint(checkpointDir);
    return jssc;
  }};
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkpointDir, fact);

최초 실행시에는 checkpoint dir 이 존재하지 않기 때문에 StreamingContext 를 생성하게 되고, Driver 오류로 죽은 뒤 재실행 될 때에는 checkpoint dir 을 이용하여 생성하게 된다.

대부분의 Cluster Manager 는 Driver 가 죽었을 때 자동적으로 재시작해 주지 않는다. 따라서 monit 같은 툴을 이용하여 모니터링을 하고, 상황에 맞게 재시작을 해 주어야 한다. 이 때 가장 좋은 방법은 Standalone Cluster Manager 에서 지원하는 --supervise 를 사용하는 것이다. 이는 Spark 가 Driver 를 재시작할 수 있게 해준다. 또한 --deploy-mode cluster 옵션을 통해 Driver 가 local 에서 수행되는 것이 아니라 Cluster 내에서 수행될 수 있도록 해야한다.

./bin/spark-submit --deploy-mode cluster --supervise --master spark://... App.jar

이런 옵션을 사용할 때, Spark Standalone Master 에 대해서도 Fault Tolerance 를 지원하고 싶을 때가 있다. 이 때는 Zookeeper 를 사용하면 된다.

Worker Fault Tolerance

Worker node 를 위해서는 Spark Streaming 과 Spark 는 동일하다. 외부로부터 받은 모든 데이터를 Worker node 에 복사를 해 두는 것이다.

Receiver Fault Tolerance

Receiver 를 갖고 있는 Worker node 를 위해서는 추가적으로 고려해야 할 사항들이 있다. Spark Streaming 은 죽은 Receiver 를 다른 node 에서 재시작하게 된다. 그러나 이 때는 수신한 모든 데이터를 잃게 된다. Flume 을 예로 들면, 두 종류의 Receiver 에 대해서 중요한 차이점은 데이터 손실 보장에 있다.

  • receiver-pull-from-sink model
    Spark 는 복제한 항목을 그냥 제거한다.
  • push-to-receiver model
    데이터를 복제하기 전에 receiver 가 죽으면 데이터 손실이 있을 수 있다.

일반적으로 어떤 receiver 든 데이터 손실을 보장하기 위해서는 fault-tolerance properties 를 잘 설정해야 한다.

Receiver 는 두 가지 사항을 보장한다.

  • 모든 데이터는 복제되어 있기 때문에 복구가 가능하다.(checkpoint)
  • kafka, push-based flume, twitter 같은 source 에서 받은 데이터를 다른 노드에 복사해 둔다. 단, Receiver 가 죽으면 데이터 손실이 있을 수 있다.

Processing Guarantees

Spark Streaming 의 Fault Tolerance 보장 때문에 Spark 는 모든 Transformation 에 대해 exactly-once semantics(정확히 한번만 수행하는 것)을 제공한다. Worker 가 죽더라도 결과는 항상 동일하다.
그러나 변환 결과를 output operation 을 통해 외부에 저장할 때에는 실패가 발생할 경우 여러번 수행될 수 있다. 그래서 시스템 적으로 이런 경우를 처리할 수 있도록 만들어야 한다. 예를 들면 transaction 처리를 할 수 도 있고, 여러 번 갱신되더라도 결과는 동일하게 구조를 가져갈 수 도 있다.(멱등성)

Streaming UI

Spark 는 Streaming UI 를 제공한다. 일반적으로 http://<driver>:4040 으로 접속할 수 있다.


Performance Considerations

Batch and Window Sizes

일반적으로 Batch Size 는 500 ms 로 설정한다. 처음에는 약 10초정도로 크게 설정하고 줄여가면서 적당한 시간을 찾는 것이 좋다.
비슷한 방법으로 Window Operation 에서는 결과를 산출해내는 주기가 성능에 큰 영향을 준다. 만약 무거운 연산을 한다면 이 주기값을 늘려야 한다.

Level of Parallelism

각 Batch 의 처리시간을 줄이는 방법은 병렬 처리 수준을 높이는 것이다. 병렬 처리 수준을 높이는 3가지 방법을 소개한다.

  • Receiver 갯수를 늘려라.
  • 수신한 데이터를 repartition 하라.
  • aggregation 수행시 병렬수준을 높혀라.

Garbage Collection and Memory Usage

Java 의 CMS(Concurerent Mark-Sweep) GC 를 사용함으로서 STW(Stop The World) 시간을 줄일 수 있다.

spark-submit --conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC App.jar

추가적으로 RDD 를 cache 할 때 직렬화된 형태로 저장한다면 GC 부담을 덜 수 있다. Kryo 직렬화를 사용한다면 좀 더 GC 부담을 줄일 수 있을 것이다.
Spark 는 Memory 제거 방식을 설정할 수 있는데, 기본적으로는 LRU 알고리즘을 적용하지만, spark.cleaner.ttl 속성을 이용하여 특정 기간이 지난 데이터에 대해서는 제거할 수 있도록 설정할 수도 있다.
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함