본문 바로가기
BigData/Spark

Learning Spark Chapter. 3 RDD 프로그래밍

by Tomining 2015. 7. 3.
RDD 란?
     An RDD in Spark is simply an immutable distributed collection of objects.
     스파크에서 RDD 란 수정불가능한 분산 Collection 이다. 각 RDD 는 여러 node에서 다룰 수 있도록 여러 조각으로 나눠서 처리된다.

RDD 기본.
     RDD를 생성할 수 있는 방법에는 두 가지가 있다.
  1. 외부 데이터 셋을 로딩
    sc.textFile(“README.md”) 처럼 local storage 나 HDFS 에서 파일을 읽어 RDD 로 생성
  2. 기존 데이터 Collection 을 기반으로 생성
    sc.parallelize(Arrays.asList(“test”, “test2”)); 처럼 기존 데이터로 생성

RDD 에는 두 종류의 operation 을 가진다.
  • Transformation(변환)
    대표적인 것이 filter() 이다. RDD 에서 특정 조건에 맞는 데이터만 골라 낼 때 사용한다.
    Transformation 의 특징은 Lazy Evaluation 이다. filter 라는 함수가 호출되었을 때 실제로 RDD 의 데이터를 읽지 않는다.

package com.tomining.spark.tutorial.example;

import java.io.Serializable;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class FilterWord implements Serializable {
public void execute(final String filterWord) {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("FilterWord");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaRDD<String>
lines = sc.textFile("REAMME.md");
JavaRDD<String>
words = lines.filter(new Function<String, Boolean>() {
public Boolean call(String source) throws Exception {
return StringUtils.contains(source, filterWord);
}
});
}
}
          
          filter() 수행시 REAME.md 파일을 실제로 읽지 않는다. filter() 결과인 words RDD 도 실제 데이터 형태로 남아 있는 것이 아니다.

  • Action(작업)
    Driver Program 의 결과가 되거나 외부 저장소에 결과정보를 작성하는 Operation 이다.
    filter() 함수를 사용한 예제를 좀 더 확장해보자.

package com.tomining.spark.tutorial.example;

import java.io.Serializable;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class FilterWord implements Serializable {
public void execute(final String filterWord) {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("FilterWord");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaRDD<String>
lines = sc.textFile("REAMME.md");
JavaRDD<String>
words = lines.filter(new Function<String, Boolean>() {
public Boolean call(String source) throws Exception {
return StringUtils.contains(source, filterWord);
}
});

System.
out.println(words.count());
}
}

          count() 할수를 호출했을 때 비로소 file 내용을 읽고 filter() 연산을 실제로 수행한다.
          
          take() 나 collect() 함수도 있다. 다만 아주 큰 데이터에서는 collect() 사용을 자제하는 것이 좋다.
          이유는 이를 단일 장비의 memory 로 내리는 역할을 하기 때문에 성능이 느려질 수 있다.
          결과를 파일로 작성하고자 할 때에는 saveAsTextFile() 를 이용하면 된다.

Spark 의 Lazy Evaluation 은 RDD operation 중 Transformation 함수가 호출 되었을 때 실제로 동작하지 않음을 의미한다. 즉, Action Operation 이 호출될 때까지 실제로 파일을 읽어서 데이터 셋을 만들어 내지 않음을 의미한다. 이는 Hadoop 의 Map&Reduce 에서 operation 을 단순화 하기 위한 작업과는 대조적이다. Lazy Evaluation 특징으로 인해 Spark 는 간단한 operation 을 여러 번 사용하면서도 성능 이슈는 생각할 필요가 없게 되었다.

일반적으로 사용되는 RDD operation 에 대해서 좀 더 알아보고자 한다면 아래 Spark 공식 가이드 문서를 참고하자.

[참고사항]
distinct() 함수는 비용이 비한 Operation 이다. intersection(), substract() 또한 비싼 함수이다. 
그 이유는 내부적으로 네트웍 상에서 데이터 shuffling 이 필요하기 때문이다.

RDD 는 기본적으로 Action operation 을 호출하기 이전까지는 별도의 작업을 하지 않는다고 설명하였다. 때에 따라서는 중간 RDD 를 재사용하고 싶을 때가 있을 것이다. 이 때 사용할 수 있는 것이 persist() 이다. cache() 함수와 유사한 동작을 한다. 아래 spark 공식 가이드 문서를 참고하자.
persist() 시에 StorageLevel 을 지정할 수 있다. 각 level 에 대한 용도를 확인하자.