티스토리 뷰

여기에서는 Spark Applicatoin 을 어떻게 설정하고 tunning 및 debugging 은 어떻게 하는지 알아보고자 한다.

SparkConf 를 통해서 Spark 설정하기

Spark Application 의 Runtime 설정을 통해서 간단히 Tuning 을 해 볼 수 있다. 기본적인 설정은 SparkConf 클래스를 통해서 설정할 수 있는데. SparkConf 는 SparkContext 를 생성할 때 필요로 한다. 아래 예제에서 확인해 보자.

public class SparkConfExample implements Serializable {
public void execute() {
SparkConf
conf = new SparkConf();
conf.set("spark.app.name", "My Spark App");
conf.set("spark.mater", "local[4]");
conf.set("spark.ui.port", "36000");

JavaSparkContext
sc = new JavaSparkContext(conf);
}
}

위 예제는 Java 에서 SparkConf 를 설정하는 예제이다.
SparkConf 는 spark.app.name 이나 spark.master 를 설정하기 위해 일반적인 API 인 setAppName() 과 setMaster() 를 제공한다.
즉, 아래와 같이 설정할 수 있다.

public class SparkConfExample implements Serializable {
public void execute() {
SparkConf
conf = new SparkConf();
conf.setAppName("My Spark App");
conf.setMaster("local[4]");
conf.set("spark.ui.port", "36000");

JavaSparkContext
sc = new JavaSparkContext(conf);
}
}

이 예제는 SparkConf 를 통해서 Spark Application 내에서 설정하는 방법이다.
 
Spark 는 spark-submit 을 통해서 동적으로 환경설정을 할 수 있도록 기능을 제공한다.

$ bin/spark-submit \
—class com.example.MyApp \
—master local[4] \
—conf spark.ui.port=36000 \
myApp.jar

spark-submit 은 file 에 설정되어 있는 config 정보도 로딩할 수 있도록 지원한다. 기본적으로 전달하지 않으면 Spark 설치 디렉토리 내에 conf/spark-defaults.conf 파일을 사용한다. --properies-file 을 통해서 다른 파일에 설정할 수도 있다.

$ bin/spark-submit \
—class com.example.MyApp \
—properties-file my-config.conf \
myApp.jar
## my-config.conf 파일 내용
spark.master         local[4]
spark.app.name    “My Spark App"
spark.ui.port         36000

Spark 설정을 여러 곳에서 할 수 있는데, 우선 순위는 아래와 같다.

  1. SparkConf.set() 을 이용하여 설정한 정보
  2. spark-submit 호출 시 전달하는 정보
  3. —properties-file 로 전달하는 정보
  4. conf/spark-defaults.conf 정보

위에서 언급하지 않는 설정정보 외에도 많은 항목들이 제공되며, 기본값이 존재하는 항목들도 있다. 아래 Spark 공식문서를 참고하자.

Components of Execution : Job, Tasks and Stage

튜닝이나 디버깅 할 때 처음 해야하는 작업은 Spark 내부 구조를 이해하는 것이다. 이전 장에서 논리적인 RDD 나 Partition 에 대해서 살펴보았다. Spark Application 이 실행될 때 논리적인 구조가 Task들 내에서 Operation을 그룹화 하면서 물리적인 실행계획으로 변경된다.

예를 들어서 살펴보자.


이전 장에서도 언급했듯이, action 타입의 Operation 이 실행되지 않으면, 어떠한 수행도 발생되지 않는다. 대신에 RDD 의 DAG(Directed Acyclic Graph) 를 정의한다. 이는 RDD 가 어떤 RDD 또는 Input Resource 로부터 생성이 되었는지 논리적으로 정의 한 것을 말한다.
예를 들면

     val b = a.map()

위처럼 b라는 RDD 를 a를 통해 생성했다고 가정하자. b 는 a Reference를 부모 RDD 로 갖고 있다. 이런 Reference 를 통해서 RDD 의 생성과정이 추적 가능하다.
RDD 계보(Lineage) 를 확인하기 위해서는 toDebugString 를 통해서 간단히 확인이 가능하다.

RDD 가 어떻게 파생되었는지 확인할 수 있다.

Action 타입의 Operation 이 수행되기 전에 RDD 는 추후에 만들어질 수 있도록 RDD 의 metadata 를 저장하고 있다.
위 예제에서 counts RDD 를 collect 해보자.

counts.collect()


Spark Scheduler 는 collect() 를 수행하기 위해 물리적 실행계획을 생성하게 되는데, 이 때 모든 RDD Partition 은 모두 실제 존재하게 되고, Driver 에 전송된다. Spark Scheduler 는 최종 RDD 를 생성할 때 미리 생성되어 있어야 할 RDD 를 찾는다. 즉, 부모 RDD 의 부모 RDD, 그리고 그 부모의 부모 RDD 를 반복적으로 찾는데, 이렇게 반복적으로 필요한 RDD 를 생성하기 위한 물리적 실행계획을 만들어 나간다.

물리적 실행계획은 RDD Graph 와 정확히 1대1 매칭이 되는 건 아니다. 이는 Spark Scheduler 가 pipelining 을 수행하거나 여러 개의 RDD 가 하나의 Stage 로 합쳐질 때 그렇게 되는데, pipelining 은 Data 이동 없이 부모 RDD 를 생성할 수 있을 때 발생한다. 아래 그림을 보면 물리적 실행계획에서 Stage 1 를 보면 확인할 수 있다.


Spark Web UI(http://master_ip:4040) 을 보면 좀 더 상세한 정보를 확인 할 수 있다.

추가적으로 RDD persist 와 cache 가 있는데, RDD 를 생성해 나가는 과정에서 이미 persist 나 cache 된 RDD 가 있다면, 다시 생성하지 않고 기존 RDD 를 그대로 활용한다. 이 외에도 Shuffle RDD 는 Disk 에 output 을 쓰게 되는데, 이는 cache 된 RDD 와 동일한 역할을 한다.

Spark Web UI

http://master_ip:4040 을 접속하면 아래와 같은 UI  화면을 볼 수 있다.
Spark 버전마다 약간의 차이가 있을 수 있다.

Driver and Executor Logs

Spark Web UI 나 toDebugString 에서 확인할 수 있는 내용은 나름 유용하긴 하지만, 때론 Spark Driver 나 Executor 가 직접 남기는 Log 를 확인할 필요가 있을 때가 있다. Log 에는 내부 경고로그(Warn Logs) 나 Exception 강세를 확인할 수 있다.

  • Standalone 모드
    • Application Log 는 Master Web UI 에서 바로 확인이 가능
    • 기본적으로 각 worker node 에 work/ 디렉토리 하위에 저장
  • Mesos
    • Mesos Slave 장비 work/ 디렉토리에 저장
    • Mesos Master UI 에서 확인 가능
  • YARN
    • YARN logs collection tool 사용
      사용법 : yarn logs -applicationId <app Id>
      이를 사용하기 위해서는 App 이 완전히 종료된 뒤에 사용이 가능하다. 그 이유는 YARN 도 각 Worker 의 log 를 수집해서 보여주기 때문이다.
    • YARN Cluster UI 에서 각 Worker Node 를 클릭하면, ResourceManager UI 에서도 확인 가능

Spark 는 내부적으로 log4j 를 사용하고 있는데, Spark 를 설치하면 기본적으로 conf/log4j.properties.template 파일을 제공한다.

 Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

root 의 기본값은 INFO 로 설정되어 있음을 확인할 수 있다. 상황에 따라 조절하면 되며, DEBUG 이하 설정은 개발환경에서만 사용하길 권장한다.

주의사항

spark-submit 을 사용하여 --file 옵션을 줄 때에는 jar 파일에 log4j.properties 파일이 포함되지 않도록 주의해야 한다. Log4j 는 Classpath 내에서 log4j.properties 파일을 검색하는데, 처음 발견된 파일을 적용한다. 따라서 직접 설정한 log4j.properties 가 적용되지 않을 수 있다.


성능 개선 주요사항

병렬성 수위(Level of Parallelism)

Spark RDD 는 여러 개의 Partition 으로 구성될 수 있는데, 기본적으로 Spark Scheduler 가 Task 를 수행할 때, 하나의 Task 에서는 하나의 Partition 을 가진다. 따라서 적절한 병렬성 수위를 갖는게 중요하다. Input RDD 의 병렬성 수위는 저장시스템에 따라 다를 수 있는데, HDFS 의 경우는 HDFS 파일 내 각 Block 들이 하나의 Parition 을 구성한다. 그리고 다른 RDD 로부터 파생되어 만들어지는 RDD 는 부모 RDD 의 Partition 크기를 따른다.

이런 병렬성 수위는 성능에 두 가지 방향으로 영향을 준다.

  • 병렬성 수위가 낮을 때, 자원(CPU/Memory)들이 유휴(Idle) 상태일 수 있다.
  • 병렬성 수위가 높을 때, 각 Partition 들이 합쳐질 때 약간의 Overhead 가 있을 수 있다.

Spark 는 병렬성 수위를 조절하기 위해 두 가지 Operation 을 제공한다.

  • repartition()
    random shuffle 을 통해서 원하는 Partition 수로 재 구성한다.
  • coaleasce()
    shuffle 작업을 하지 않기 때문에, repartition() 보다 효과적일 수 있다.

직렬화 유형(Serialization Format)

Spark 에서 사용되는 직렬화 유형은 아래와 같다.

  • Java Serialization(기본값)
  • Kryo Serialization(Scala/Java 만 지원)
  • 그 외 Serialization

Kryo Serialization 을 사용하기 위해서는 아래와 같이 설정이 필요하다.

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Be strict about class registration
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))

Kryo 나 Java Serialization 을 사용할 때 Java Serialization 을 구현하지 않은 클래스에서는 NotSerailzableException 이 발생할 수 있다. 이런 경우 어떤 클래스에서 문제가 되는지 찾기가 어려울 수 있는데, 이 때 “-Dsun.io.serialization.extended.DebugInfo=true” 옵션을 사용하면 된다. spark-submit 사용시 --driver-java-options 나 --executor-java-options 를 통해서 설정할 수 있다.

메모리 관리(Memory Management)

Spark 는 메모리를 다른 방법으로 사용하고 있는데, 이를 이해하고 있으면, Application 을 최적화하는데 도움이 된다. 각 Executor 내부에서 메모리는 몇 가지 유형으로 사용된다.

  • RDD Storage(60%)
    RDD 의 persist() 나 cache() 때 사용하는 공간으로 spark.storage.memoryFraction 옵션을 통해서 설정된다.
  • Shuffle/Aggregation Buffer(20%)
    shuffle 이나 aggregation Operation 수행시 사용되는 buffer 공간으로 spark.shuffle.memoryFraction 옵션을 통해 설정된다.
  • User Code(20%)
    User Code 에도 메모리가 필요할 수 있는데, 예를 들면, Application 에서 큰 배열이나 다른 객체를 갖고 있을 때 메모리가 사용된다. 이는 JVM heap 영역 중 RDD storage 와 Shuffle/Aggregation Buffer 가 사용되고 남은 공간을 전부 사용한다.

기본적으로 10G 를 할당했다면, RDD 는 6G, shuffle/aggegation buffer 는 2G, User Code는 나머지 2G 를 사용하게 된다.

하드웨어 할당(Hardware Provisioning)

Spark 에서 하드웨어 자원은 Application 수행시간에 큰 영향을 준다. Cluster 용량에 영향을 주는 주요 파라메터는 아래와 같다.

  • 각 Executor 에 얼마만큼의 메모리를 할당하는지?
  • 각 Executor 는 몇 개의 CPU core 를 사용하는지?
  • 전체 Executor 갯수는 어떻게 되는지?

이를 설정하기 위해 YARN 에서는 몇가지 파라메터를 제공한다.

  • spark.executor-cores 또는 —executor-cores
  • —num-executors

상세설정을 어떻게 하면 좋은지는 아래 블로그를 참고하자.


공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/04   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함