BigData/Spark
Learning Spark Chapter. 6 Spark 프로그래밍 고급편
Tomining
2015. 7. 23. 21:03
Accumulators
map() 이나 filter() 함수를 사용할 때, driver program(Spark Application) 에서 정의한 변수들을 전달한다.
하지만 실제로 worker node 에서 task 들이 수행될 때 각 변수들의 copy 본을 갖게 되고, 이 copy 본을 갱신 등 조작하게 된다.
이 때 driver program 에서는 이 변수들의 변경사항을 확인할 수 없는데, Accumulators 와 Broadcast 변수를 통해 데이터를 공유할 수 있다.
package com.tomining.spark.tutorial.example.ch6;
import java.io.Serializable; import java.util.Arrays; import org.apache.commons.lang3.StringUtils; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; public class ShareAccumulator implements Serializable { public void execute(String inputFile, String outputFile) { SparkConf conf = new SparkConf().setMaster("local").setAppName("ShareAccumulator"); JavaSparkContext sc = new JavaSparkContext(conf); final Accumulator<Integer> acc = sc.accumulator(0); JavaRDD<String> rdd = sc.textFile(inputFile); rdd.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String t) throws Exception { if (StringUtils.isEmpty(t)) { acc.add(1);; } return (Iterable<String>) Arrays.asList(t.split(" ")).iterator(); } }); rdd.saveAsTextFile(outputFile); System.out.println("Blank lines : " + acc.value()); } }
|
Accumulator 는 위 샘플 코드처럼 사용할 수 있다.
Accumulator 는 Spark 가 수행될 때, 각 worker node 에 복사본을 전달하고 Spark Application 종료 시점에 각 worker node 로부터 수집하여 결과를 생성한다.
이 때 특정 worker node 가 crash 가 발생한더라도 실패한 Task 가 재수행(새로운 worker node 할당) 되면서 cache 된 값을 사용하기 때문에 문제가 없다.
그렇다면 Spark Application 내에서 Accumulator 는 어떻게 동작할까?
Spark Operation 종류에 따라 조금 차이가 있다.
- action
각 Task 내에서 단 1번만 갱신된다. - transformation
RDD 의 각 row 가 접근될 때마다 갱신된다.
따라서 debug 용으로만 사용하길 권장한다.
Spark 는 Accumulator 데이터 타입으로 Integer 외에도 Double, Long, Float 를 지원한다.
또한 사용자가 정의한 타입에 대해서도 적용이 가능하다.
Broadcasts
Broadcast variables 은 Driver Program 이 Spark Operation 에서 사용할 수 있도록 각 worker 노드에 큰 읽기전용 데이터를 효율적으로 보낼 수 있게 해 준다.
Spark는 Closure 내 모든 변수(값)들을 worker 노드들에게 자동으로 보낸다. 이는 효율적이기도 하지만 2가지 이유로 비효율적일 수 있다.
- 기본 Task 들의 수행 mechanism 은 작은 Task 에 최적화 되어 있다.
- 여러 개의 parallel operation 에서 동일한 값을 사용할텐데, Spark 는 각 Operation 에 따로 전송한다.
P.104 에 나오는 예제를 살펴보자.
해당 프로그램을 실행할 때, 만약 signPrefixes 값이 큰 Array라면, 각 Task 들에게 해당 정보를 보내는 것은 비효율적일 수 있다.(아마 signPrefixes는 수 MB 정도 될 것이다.)
또한 이를 재사용하고자 할 때는 또 전송해야 한다. 이 때 Broadcase variable 을 사용할 수 있다.
signPrefixes = sc.broadcast(loadCallSignTable());
|
위 처럼 signPrefixes 를 Broadcast<T> 타입으로 생성하고, value(java에서는 value()) 를 통해서 데이터에 접근할 수 있다.
이는 read-only 데이터로 각 worker nodes 에서 변경할 수 없으며, 변경되지 않는다.
(java 에서는 final 로 선언하길 권장한다)
Broadcast 최적화 하기
큰 데이터를 Broadcasting 할 때, data serialization 을 잘 선택하는 것이 중요하다. 그 이유는 network 를 통해 Broadcast 데이터를 각 worker 노드에 보내는 시간이 병목이 될 수 있다. Spark 는 기본적으로 java.io.Serializable 을 사용한다. java.io.Serializable 은 Primitive Type 을 제외하고는 아주 비효율적이다. spark.serializer 속성(SparkConf에 설정)을 통해서 다른 Serializer 를 선택할 수 있다.
Spark 에서는 Kyro 라는 빠른 Serialzer 를 제공한다.(책 8장에서 다룬다.)
파티션 기반 작업
파티션 기반으로 데이터를 처리한다면, 각 데이터마다 준비작업을 다시 수행하는 것을 막을 수 있다. 예를 들면, DB 연결 작업이나, Random Number 생성 같은 작업이 있다. 파티션 기반의 map() 이나 foreach() 함수를 사용하면 이런 비용들을 줄일 수 있다.
- mapPartition
- mapPartitionWithIndex
- foreachPartition
외부 프로그램 수행하기(pipe)
Scala/Python/Java 외에도 다른 언어로 작성된 Spark Application 을 수행할 수 있다. 이 때 사용되는 것이 Spark 의 pipe() 이다.
책 109 페이지를 살펴보면 R Script 를 수행하는 예제가 나온다.
(단, Spark 1.4.0 버전부터는 R Script 를 수행할 수 있도록 기본적으로 기능에 제공된다. 더 이상 pipe 를 사용하기 위한 모듈을 직접 만들 필요가 없어졌다.)
Numeric RDD Operation
Spark 의 Numeric Operation 에는 RDD 모델을 생성하면서 함께 계산될 수 있는 Streaming Algorithm 이 구현되어 있다. RDD 의 통계정보가 함께 계산되며, stats() 함수를 통해 StatsCounter 라는 객체로 반환된다. StatsCounter 에서 제공되는 API 는 아래와 같다.
- count()
- mean()
- sum()
- max()
- min()
- variance() -> 분산
- sampleVariance()
- stdev() -> 표준편차
- sampleStdev()