BigData/Spark

RDD persist() or cache() 시 주의사항

Tomining 2015. 7. 17. 22:23
RDD persist() 나 cache() 를 사용할 때에는 주의사항이 필요하다.

예를 들어 아래와 같이 코드를 작성했다고 가정하자.
lines 라는 JavaRDD 데이터를 persist() 를 통해서 caching 처리하였다.

StorageLevel.MEMORY_ONLY() 가 기본값이며, 여기서는 StorageLevel.MEMORY_ONLY_SER() 을 사용하였다.

package com.tomining.spark.tutorial.example.etc;

import java.io.Serializable;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;

public class RddPersist implements Serializable {
public void execute() {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("RddPersist");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaRDD<String>
lines = sc.textFile("REAMME.md");

lines.persist(StorageLevel.MEMORY_ONLY_SER());

System.
out.println(String.format("line count is %d", lines.count()));
}
}

위 코드를 수행하면 무슨 문제가 있을까?

persist() 를 사용하면 spark driver program 이 종료되더라도 memory 에 상주하게 된다.
LRU 정책에 따라 memory 에 유지되는데, 이를 별도로 해제해주지 않으면 계속 상주하게 된다.


위 차트는 32GB 장비에서 Spark Application 을 수행했을 때, 장비의 memory 사용률을 확인 한 것이다.
약 20GB 정도가 memory cached 영역으로 잡혀있고, application 이 종료되더라도 제거되지 않는다.
붉은 부분은 장비에서 drop_cache 를 통해 강제로 해제해 준 케이스이다.
그 후 재수행을 해보면 또 다시 memory 영역을 차지하는 것을 확인할 수 있다.

그렇다면 이를 어떻게 해결할 수 있을까?
일단 spark persist 관련 문서에 보면 아래와 같이 설명하고 있다.
cache 를 제거하기 위해서는 RDD method 인 unpersist() 를 호출해 주어야 한다고 되어 있다.

Removing Data


Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the€RDD.unpersist()€method.

아래 URL 에서도 동일한 설명을 하고 있다.

그렇다면 코드를 고쳐보자.

package com.tomining.spark.tutorial.example.etc;

import java.io.Serializable;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;

public class RddPersist implements Serializable {
public void execute() {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("RddPersist");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaRDD<String>
lines = sc.textFile("REAMME.md");

lines.persist(StorageLevel.MEMORY_ONLY_SER());

System.
out.println(String.format("line count is %d", lines.count()));

lines.unpersist(); //cache 영역 제거
}
}

unpersist() 를 호출하여 수동으로 memory 에서 제거해 주었다.

32GB 장비에서도 잘 해제가 되는지 memory 사용률 그래프를 확인해 보자.




persist 되는 RDD 의 경우는 변경이 없는 경우이거나 하나의 RDD 가 여러 Application 에서 공유될 때 사용하는 것이 바람직하다.
즉, memory 에 올려두고 여러 번 사용할 때 더 효율적이라는 의미이다.
한 번 읽어서 사용하고 재사용되지 않는다면, memory 에 남길 필요가 없다.