본문 바로가기
BigData/Spark

Learning Spark Chapter. 6 Spark 프로그래밍 고급편

by Tomining 2015. 7. 23.
Accumulators

map() 이나 filter() 함수를 사용할 때, driver program(Spark Application) 에서 정의한 변수들을 전달한다.
하지만 실제로 worker node 에서 task 들이 수행될 때 각 변수들의 copy 본을 갖게 되고, 이 copy 본을 갱신 등 조작하게 된다.
이 때 driver program 에서는 이 변수들의 변경사항을 확인할 수 없는데, Accumulators 와 Broadcast 변수를 통해 데이터를 공유할 수 있다.

package com.tomining.spark.tutorial.example.ch6;

import java.io.Serializable;
import java.util.Arrays;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

public class ShareAccumulator implements Serializable {
public void execute(String inputFile, String outputFile) {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("ShareAccumulator");
JavaSparkContext
sc = new JavaSparkContext(conf);

final Accumulator<Integer> acc = sc.accumulator(0);
JavaRDD<String>
rdd = sc.textFile(inputFile);
rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String t) throws Exception {
if (StringUtils.isEmpty(t)) {
acc.add(1);;
}

return (Iterable<String>) Arrays.asList(t.split(" ")).iterator();
}
});

rdd.saveAsTextFile(outputFile);
System.
out.println("Blank lines : " + acc.value());
}
}

Accumulator 는 위 샘플 코드처럼 사용할 수 있다.

Accumulator 는 Spark 가 수행될 때, 각 worker node 에 복사본을 전달하고 Spark Application 종료 시점에 각 worker node 로부터 수집하여 결과를 생성한다.
이 때 특정 worker node 가 crash 가 발생한더라도 실패한 Task 가 재수행(새로운 worker node 할당) 되면서 cache 된 값을 사용하기 때문에 문제가 없다.

그렇다면 Spark Application 내에서 Accumulator 는 어떻게 동작할까?
Spark Operation 종류에 따라 조금 차이가 있다.
  • action
    각 Task 내에서 단 1번만 갱신된다.
  • transformation
    RDD 의 각 row 가 접근될 때마다 갱신된다.
    따라서 debug 용으로만 사용하길 권장한다.

Spark 는 Accumulator 데이터 타입으로 Integer 외에도 Double, Long, Float 를 지원한다.
또한 사용자가 정의한 타입에 대해서도 적용이 가능하다.

Broadcasts

Broadcast variables 은 Driver Program 이 Spark Operation 에서 사용할 수 있도록 각 worker 노드에 큰 읽기전용 데이터를 효율적으로 보낼 수 있게 해 준다.

Spark는 Closure 내 모든 변수(값)들을 worker 노드들에게 자동으로 보낸다. 이는 효율적이기도 하지만 2가지 이유로 비효율적일 수 있다.

  1. 기본 Task 들의 수행 mechanism 은 작은 Task 에 최적화 되어 있다.
  2. 여러 개의 parallel operation 에서 동일한 값을 사용할텐데, Spark 는 각 Operation 에 따로 전송한다.

P.104 에 나오는 예제를 살펴보자.
해당 프로그램을 실행할 때, 만약 signPrefixes 값이 큰 Array라면, 각 Task 들에게 해당 정보를 보내는 것은 비효율적일 수 있다.(아마 signPrefixes는 수 MB 정도 될 것이다.)
또한 이를 재사용하고자 할 때는 또 전송해야 한다. 이 때 Broadcase variable 을 사용할 수 있다.

signPrefixes = sc.broadcast(loadCallSignTable());

위 처럼 signPrefixes 를 Broadcast<T> 타입으로 생성하고, value(java에서는 value()) 를 통해서 데이터에 접근할 수 있다.

이는 read-only 데이터로 각 worker nodes 에서 변경할 수 없으며, 변경되지 않는다.
(java 에서는 final 로 선언하길 권장한다)

Broadcast 최적화 하기

큰 데이터를 Broadcasting 할 때, data serialization 을 잘 선택하는 것이 중요하다. 그 이유는 network 를 통해 Broadcast 데이터를 각 worker 노드에 보내는 시간이 병목이 될 수 있다. Spark 는 기본적으로 java.io.Serializable 을 사용한다. java.io.Serializable 은 Primitive Type 을 제외하고는 아주 비효율적이다. spark.serializer 속성(SparkConf에 설정)을 통해서 다른 Serializer 를 선택할 수 있다.
Spark 에서는 Kyro 라는 빠른 Serialzer 를 제공한다.(책 8장에서 다룬다.) 

파티션 기반 작업

파티션 기반으로 데이터를 처리한다면, 각 데이터마다 준비작업을 다시 수행하는 것을 막을 수 있다. 예를 들면, DB 연결 작업이나, Random Number 생성 같은 작업이 있다. 파티션 기반의 map() 이나 foreach() 함수를 사용하면 이런 비용들을 줄일 수 있다.

  • mapPartition
  • mapPartitionWithIndex
  • foreachPartition


외부 프로그램 수행하기(pipe)

Scala/Python/Java 외에도 다른 언어로 작성된 Spark Application 을 수행할 수 있다. 이 때 사용되는 것이 Spark 의 pipe() 이다.
책 109 페이지를 살펴보면 R Script 를 수행하는 예제가 나온다.
(단, Spark 1.4.0 버전부터는 R Script 를 수행할 수 있도록 기본적으로 기능에 제공된다. 더 이상 pipe 를 사용하기 위한 모듈을 직접 만들 필요가 없어졌다.)

Numeric RDD Operation

Spark 의 Numeric Operation 에는 RDD 모델을 생성하면서 함께 계산될 수 있는 Streaming Algorithm 이 구현되어 있다. RDD 의 통계정보가 함께 계산되며, stats() 함수를 통해 StatsCounter 라는 객체로 반환된다. StatsCounter 에서 제공되는 API 는 아래와 같다.

  • count()
  • mean()
  • sum()
  • max()
  • min()
  • variance() -> 분산
  • sampleVariance()
  • stdev() -> 표준편차
  • sampleStdev()