티스토리 뷰

key/value 로 구성된  RDD 에 대해서 알아보자.

먼저 PairRDD 는 아래와 같이 만들 수 있다.

package com.tomining.spark.tutorial.example;

import java.io.Serializable;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class PairRDDExample implements Serializable {
public void execute() {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("FilterWord");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaRDD<String>
lines = sc.textFile("REAMME.md");
lines.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x) {
return new Tuple2<String, String>(x.split(" ")[0], x);
}
});

sc.close();
}
}

만약 메모리로부터 PairRDD 를 만드려면 sc.parallelizePairs() 를 사용할 수 있다.

상세 operation 들은 공식 가이드 문서에서 확인할 수 있다.

자바 api 문서는 아래와 같다.

가이드 문서를 확인하면 알겠지만, PairRDD 또한 RDD 의 일종으로 기본 operation 은 다 지원한다.
예를 들면 filter() 가 있다.
RDD 의 map() 처럼 값을 변경하고자 한다면, PairRDD 의 mapValues 를 이용할 수 있다.



Transformation Operations

Aggregations
RDD 의 fold(), combine(), reduce() 역할을 하는 foldByKey(), combineByKey(), reduceByKey() 가 있다.

combineByKey() 를 살펴보자.

combineByKey() 는 3가지 파라미터를 받는다.
  • create
  • merge
  • combine

create/merge 의 경우는 동일 partition 에서 동작한다.
즉, 여러 worker node 에서 동작을 할 텐데, 각 worker node 에서 새로운 key 가 나왔을 때 create 가 호출되고, 기존에 나왔던 key라면 merge 가 호출된다.
최종적으로  여러 worker node 에서 집계된 데이터를 하나의 RDD 로 합칠 때 combine 이 호출된다.

아래 예제는 README.md 파일에서 각 라인의 첫 단어들로만 단어수와 평균을 계산하는 코드이다.
예제라서 평균은 무조건 1로 계산된다.

package com.tomining.spark.tutorial.example.ch4;

import java.io.Serializable;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

/**
 * 단어별 count and 평균
 * @author tomining
 *
 */
public class PairRDDCombineExample implements Serializable {
public void execute() {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("FilterWord");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaRDD<String>
lines = sc.textFile("REAMME.md");
JavaPairRDD<String, Integer>
nums = lines.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) {
return new Tuple2<String, Integer>(x.split(" ")[0], 1);
}
});

AvgCount
initial = new AvgCount(0, 0);
JavaPairRDD<String, AvgCount>
avgCounts = nums.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount>
countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
System.
out.println(entry.getKey() + ":" + entry.getValue().avg());
}
}

Function<Integer, AvgCount>
createAcc = new Function<Integer, AvgCount>() {
public AvgCount call(Integer x) {
return new AvgCount(x, 1);
}
};

Function2<AvgCount, Integer, AvgCount>
addAndCount = new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total += x;
a.num += 1;
return a;
}
};

Function2<AvgCount, AvgCount, AvgCount>
combine = new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total += b.total;
a.num += b.num;
return a;
}
};
}

class AvgCount implements Serializable {
int total;
int num;

public AvgCount(int total, int num) {
this.total = total;
this.num = num;
}

public float avg() {
return total/(float) num;
}
}

grouping data
Key 별로 Value를 grouping 할 때 groupByKey() 를 사용할 수 있다.
reduceByKey() 와 유사하게 동작 하긴 하지만 value들을 모두 list 형 자료형에 담기 때문에 성능이 느리다.
즉, value들의 값이 필요한 게 아니라면 reduceByKey를 사용하는 것이 좋다.

만약 여러 RDD 에 대해서 동일 key에 대한 grouping 을 하고자 할 때 cogroup() 함수를 사용할 수 있다.
단, cogroup() 는 join() 을 이용하여 구현이 가능한데, 3개 이상의 RDD 를 사용할 때 cogroup 을 사용하길 권장한다.

Join
RDBMS 에서 Join 기능과 동일하다.
inner join, left outer join, right outer join, full outer join 모두 지원한다. 기본은 inner join 이다.

sorting data
key 순으로 정렬을 하기 위해서 sortByKey() 함수를 사용할 수 있다.
오름차순 정렬이 기본값이다.

rdd.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x : str(x))



Action Operations

RDD 와 동일하게 기본적인 Action 계열의 operation 들을 지원한다.

countByKey()
collectAsMap()
lookup(key)




Data Partitioning

join 같이 key 값을 기반으로 수행되는 operation 에서는 dataset 이 여러 번 access 되기 때문에 partitioning 을 사용하는 것이 유용하다.

Partition 에는 세 가지가 있다.
  • Hash Partition
  • Range Partition
  • Custom Partition

간단한 예를 들어 보자.
[UserID, UserData] , [UserID, Links] 로 구성된 두 개의  RDD 가 있다고 가정하자.
그리고 두 개의 RDD 를 join 한다고 해보자.(아래 코드 참조)

package com.tomining.spark.tutorial.example.ch4;

import java.io.Serializable;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class RddPartition implements Serializable {
public void execute(String outputPath) {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("FilterWord");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaRDD<String>
lines = sc.textFile("REAMME.md");
JavaPairRDD<String, Integer>
nums = lines.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) {
return new Tuple2<String, Integer>(x.split(" ")[0], 1);
}
});

JavaPairRDD<String, UserData>
userDataRDD = lines.mapToPair(new PairFunction<String, String, UserData>() {
public Tuple2<String, UserData> call(String x) {
return new Tuple2<String, UserData>(x.split(" ")[0], new UserData(x));
}
});
JavaPairRDD<String, Link>
linkDataRDD = lines.mapToPair(new PairFunction<String, String, Link>() {
public Tuple2<String, Link> call(String x) {
return new Tuple2<String, Link>(x.split(" ")[0], new Link(x));
}
});

userDataRDD.join(linkDataRDD)
.saveAsTextFile(
outputPath);

sc.close();
}
}

class UserData {
private String data;
public UserData(String data) {
this.data = data;
}
}

class Link {
private String data;
public Link(String data) {
this.data = data;
}
}

join 의 경우 transformation operation 의 하나로 연산 후 새로운 RDD 를 생성해 내게 되는데.
아래 그림에서 joined 라는 RDD 라고 할 수 있다.

join 시에 UserData 와 Links(events) 모두 key 값을 통해 hash partition 이 적용된다.
따라서 각 데이터 셋이 여러 worker node 에 분산 저장되어 있게 되는데, join 시 원격지에 있는 RDD Partition 의 데이터를 network 를 통해 가져오게 된다.
이 말은 UserData, LinkData, Joined 이 3개의 RDD 가 동일 장비에 있다고 보장하지 않는다는 것이다.
이렇게 그림의 실선처럼 network cost 가 많이 발생하게 되는데. 이는 참 비효율적이다.


이를 해결하기 위해 partitionBy() 를 활용하면 된다.(아래 붉은색 코드 참고)

package com.tomining.spark.tutorial.example.ch4;

import java.io.Serializable;

import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;

public class RddPartitionBy implements Serializable {
public void execute(String outputPath) {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("FilterWord");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaRDD<String>
lines = sc.textFile("REAMME.md");
JavaPairRDD<String, Integer>
nums = lines.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) {
return new Tuple2<String, Integer>(x.split(" ")[0], 1);
}
});

JavaPairRDD<String, UserData>
userDataRDD = lines.mapToPair(new PairFunction<String, String, UserData>() {
public Tuple2<String, UserData> call(String x) {
return new Tuple2<String, UserData>(x.split(" ")[0], new UserData(x));
}
})
.partitionBy(
new HashPartitioner(10))
.persist(StorageLevel.MEMORY_ONLY());
JavaPairRDD<String, Link>
linkDataRDD = lines.mapToPair(new PairFunction<String, String, Link>() {
public Tuple2<String, Link> call(String x) {
return new Tuple2<String, Link>(x.split(" ")[0], new Link(x));
}
});

userDataRDD.join(linkDataRDD)
.saveAsTextFile(
outputPath);

sc.close();
}
}

StorageLevel 에 대해서는 추후 다시 설명하기로 한다. 좀 더 상세히 알고 싶다면 아래 공식페이지를 확인하자.


이렇게하면 하나의 RDD 에 대해서만 shuffle 이 발생한다.
shuffle 이란? 각 worker 에 존재하는 데이터 partition 에 대해서 다른 partition 으로 이동하는 것을 의미한다.

처음 join 에 비해 network cost 가 많이 줄어들었음을 확인할 수 있다.
상대적으로 데이터가 큰 RDD 를 partitionBy() 한 뒤 persist() (local 메모리에 저장) 해야 network cost 를 최대한 줄일 수 있다.

이런 방법으로 이점을 얻을 수 있는 RDD operation 에는 아래와 같은 operation 들이 있다.
  • cogroup()
  • groupWith()
  • join()
    • leftOuterJoin()
    • rightOuterJoin()
  • groupByKey()
  • reduceByKey()
  • combineByKey()
  • lookup()

책 66 페이지에 PageRank 알고리즘을 설명하는 예제가 있다.
어렵지 않으니, 각 자 한번 살펴보도록 하자.

// Assume that our neighbor list was saved as a Spark objectFile
val links = sc.objectFile[(StringSeq[String])]("links")
              .partitionBy(new HashPartitioner(100))
              .persist()

// Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD
// will have the same partitioner as links
var ranks = links.mapValues(v => 1.0)

// Run 10 iterations of PageRank
for (i <- 0 until 10) {
  val contributions = links.join(ranks).flatMap {
    case (pageId, (links, rank)) =>
      links.map(dest => (dest, rank / links.size))
  }
  ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}

// Write out the final ranks
ranks.saveAsTextFile("ranks")


마지막으로 Custom Partition 에 대해서 살펴보자.
데이터 분석을 하다보면 hash 또는 range partition 이 아닌 다른 용도의 partition 이 필요한 경우가 있다.

예를 들면 URL이 key 인 경우가 있을 수 있다.
blog.naver.com/test1234/1
blog.naver.com/test1234/2
두 개 url 을 분석한다고 하자. 분석이 빠르려면 동일 장비 내에 같은 partition 에 있으면 좋을 것이다.
하지만 hash partition 을 작용한다면 두 URL 정보는 서로 다른 partition 에 위치하게 된다. 이럴 때 Custom Partition 을 활용하면 된다.

getPartition(Object key)
numPartitions() 

위 두개의 method 만 구현하면 된다.(아래 코드 참고)

package com.tomining.spark.tutorial.example.ch4;

import java.net.URL;

import org.apache.spark.Partitioner;

public class DomainNamePartitioner extends Partitioner {
private int numPartition;

public DomainNamePartitioner(int numParts) {
numPartition = numParts;
}

@Override
public int getPartition(Object key) {
try {
String
domain = new URL(key.toString()).getHost();
int code = domain.hashCode() % numPartitions();
if (code < 0) {
return code + numPartitions();
}
else {
return code;
}
}
catch (Exception e) {

}

return 0;
}

@Override
public int numPartitions() {
return numPartition;
}

}


공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함