BigData/Spark
RDD persist() or cache() 시 주의사항
Tomining
2015. 7. 17. 22:23
RDD persist() 나 cache() 를 사용할 때에는 주의사항이 필요하다.
예를 들어 아래와 같이 코드를 작성했다고 가정하자.
lines 라는 JavaRDD 데이터를 persist() 를 통해서 caching 처리하였다.
StorageLevel.MEMORY_ONLY() 가 기본값이며, 여기서는 StorageLevel.MEMORY_ONLY_SER() 을 사용하였다.
(각 level 별 차이점은 https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence 을 참고하자)
package com.tomining.spark.tutorial.example.etc;
import java.io.Serializable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.storage.StorageLevel; public class RddPersist implements Serializable { public void execute() { SparkConf conf = new SparkConf().setMaster("local").setAppName("RddPersist"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("REAMME.md"); lines.persist(StorageLevel.MEMORY_ONLY_SER()); System.out.println(String.format("line count is %d", lines.count())); } }
|
위 코드를 수행하면 무슨 문제가 있을까?
persist() 를 사용하면 spark driver program 이 종료되더라도 memory 에 상주하게 된다.
LRU 정책에 따라 memory 에 유지되는데, 이를 별도로 해제해주지 않으면 계속 상주하게 된다.
위 차트는 32GB 장비에서 Spark Application 을 수행했을 때, 장비의 memory 사용률을 확인 한 것이다.
약 20GB 정도가 memory cached 영역으로 잡혀있고, application 이 종료되더라도 제거되지 않는다.
붉은 부분은 장비에서 drop_cache 를 통해 강제로 해제해 준 케이스이다.
그 후 재수행을 해보면 또 다시 memory 영역을 차지하는 것을 확인할 수 있다.
그렇다면 이를 어떻게 해결할 수 있을까?
일단 spark persist 관련 문서에 보면 아래와 같이 설명하고 있다.
cache 를 제거하기 위해서는 RDD method 인 unpersist() 를 호출해 주어야 한다고 되어 있다.
Removing DataSpark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the
RDD.unpersist() method. |
아래 URL 에서도 동일한 설명을 하고 있다.
그렇다면 코드를 고쳐보자.
package com.tomining.spark.tutorial.example.etc;
import java.io.Serializable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.storage.StorageLevel; public class RddPersist implements Serializable { public void execute() { SparkConf conf = new SparkConf().setMaster("local").setAppName("RddPersist"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("REAMME.md"); lines.persist(StorageLevel.MEMORY_ONLY_SER()); System.out.println(String.format("line count is %d", lines.count())); lines.unpersist(); //cache 영역 제거 } }
|
unpersist() 를 호출하여 수동으로 memory 에서 제거해 주었다.
32GB 장비에서도 잘 해제가 되는지 memory 사용률 그래프를 확인해 보자.
persist 되는 RDD 의 경우는 변경이 없는 경우이거나 하나의 RDD 가 여러 Application 에서 공유될 때 사용하는 것이 바람직하다.
즉, memory 에 올려두고 여러 번 사용할 때 더 효율적이라는 의미이다.
한 번 읽어서 사용하고 재사용되지 않는다면, memory 에 남길 필요가 없다.