BigData/Spark
Learning Spark Chapter. 10 Spark Streaming
Tomining
2015. 8. 20. 20:21
RDD 개념을 갖고 있는 Spark 와 유사하게 Spark Streaming 은 DStream 또는 Discretized Streams 라고 불리는 추상개념을 갖고 있다.
DStream 이란? 시간 흐름에 따른 순차적 데이터를 의미한다. 내부적으로 DStream 은 각 시점에 RDD 시퀀스이다. DStream 은 Flume, Kafka 또는 HDFS 같은 많은 Input 유형으로 부터 생성될 수 있다.
DStream 에는 두 가지 유형의 Operation 이 제공된다.
- Transformation
DStream 을 생성(파생) - Output
외부 데이터 저장소에 결과 저장
기본적으로 RDD 에 제공되는 Operation 과 시간 기반의 Operation(예를 들면 sliding window) 을 제공한다.
Batch 프로그램과 다르게 Spark Streaming 은 24/7 을 수행하기 위해서 몇 가지 추가 설정이 필요하다.
HDFS 같은 저장소에 데이터를 저장하기 위해 Spark Streaming 이 제공하는 주요 메커니즘 인 checkpointing 과 Application 오류가 발생했을 경우 어떻게 재시작 하는지, 자동으로 재시작 할 수 있게 하기 위해 어떻게 설정하는지에 대해 알아보자.
A Simple Example
간단한 예제를 살펴보자.
시스템 Log Stream 을 받아서 error 를 포함하고 있는 line 만 출력해보자.
- StreamingContext 생성
- batch interval(1초) 설정(신규 데이터가 어떤 주기로 생성되는지)
- socketTextStream() 사용하여 DStream 생성
- “error” 를 포함한 라인만 선별(filter)
- prrint()
위 코드는 데이터를 받았을 때, 어떤 동작을 할 지 정의한 것이다. 데이터를 받기 시작하기 위해서는 StreamingContext.start() 함수를 호출해 주어야 한다. 그런 다음 Spark Streaming 이 Spark Job 을 시작하는데, 이는 각각의 Thread 로 동작한다. 따라서 Application 이 종료되지 않고 연산을 마칠 때까지 유지하기 위해서 SparkContext.awaitTermination() 을 호출해 주어야 한다.
StreamingContext 는 한 번만 시작될 수 있으며, DStream 이나 Output 설정이 완료된 뒤 시작되어야 한다.
위에서 만든 예제를 돌려보자.
* 위도우 사용자는 nc 대신에 ncat 을 사용해야 한다.
Architecture and Abstraction
Spark Streaming 은 “micro-batch” 구조를 사용한다. 이는 작은 데이터 묶음을 연속된 Batch Series 처럼 데이터를 처리한다.
정해진 주기마다 새로운 Batch 가 생성되는데, 각 주기 내에서
- 시작부분
새로운 Batch 를 생성하고, 들어온 데이터를 Batch 에 추가한다. - 중간부분
Batch Process - 끝부분
Batch 진행이 완료된다.
Batch 주기는 보통 500 micro second 에서 수 초 정도로 설정한다.
Spark Streaming 구조는 아래와 같다.
Spark Streaming 프로그래밍은 Stream 내에 일부 데이터로 구성된 RDD Sequence 인 Discretized Stream 또는 DStream 를 처리하는 것이다.
DStream 은 외부 Input Source 로 부터 생성하거나, 기존 DStream 을 변환(Transformation) 하여 생성할 수 있다. 앞에서도 설명했듯이, 기존 RDD 에서 지원하는 Transformation Operation 물론이고, DStream 은 시간 범위 내에 데이터를 집계할 수 있는 “Stateful Transformation” 도 제공한다.
위 Simple Example 에서는 Socket 을 통해서 받은 데이터로부터 DStream 을 생성하고 Filter 함수를 적용하였다.
예제를 실행시켜보면 아래와 같은 결과를 볼 수 있다.
“micro-batch” 구조의 Spark Streaming 을 잘 표현해 주고 있는 결과 로그이다. Batch 주기(샘플에서는 1초) 별로 해당 로그가 출력된다. 이렇듯 Spark Streaming 은 많은 작은 Job 들로 구성이 되는데, 이를 Spark UI 에서 확인할 수 있다.
Transformation Operation 외에도 DStream 은 print() 같은 Output Operation 도 제공한다. 이는 RDD 의 Action Operation 과 유사한데, 각 Batch 에서 주기적으로 실행이 되며, Batch 결과로 처리된다.
Driver-Worker 구조에서 Spark Streaming 수행은 아래와 같이 일이난다. 각 Input 데이터를 받기 위해 receiver 를 실행한다. 이 receiver 는 Executor 와 함께 수행되는데, Input Source 로부터 데이터를 수집하고, 이를 RDD 로 저장하는 역할을 한다. Fault Tolerance(유사시) 를 위해 데이터를 받아서 다른 Executor 에 복사를 해 둔다. 이 데이터는 Cached RDD 처럼 Executor 에 저장된다. Driver Program 의 StreamingContext 는 데이터를 처리하고 이전 Time 의 RDD 와 합치기 위해 주기적으로 Spark Job 을 수행한다.
위에서 잠깐 언급했듯이, Spark Streaming 은 Fault Tolerance 를 위해 Input Source 를 복제를 해 둔다. 그래서 특정 Worker 노드가 실패하더라도 복제된 데이터로 복구가 가능하다. 그러나 Lineage(계보) 를 이용해서 Application 이 실행된 이후 생성된 데이터를 다시 계산하는 것은 상당한 시간이 소요된다. 그래서 Spark Streaming 은 주기적으로 파일시스템(HDFS 나 S3) 에 저장할 수 있는 checkpointing 이라는 mechanism 을 이용한다. 보통 5~10 Batch 데이터를 저장하도록 설정한다.
Transformations
DStream 의 Transformation Operation 은 두가지로 구분될 수 있다.
- Stateless
이전 Batch 결과 데이터에 영향을 받지 않는 데이터를 처리
map(), filter(), reduceByKey() 같은 Operation 들 - Stateful
이전 Batch 결과 데이터를 현재 Batch 결과를 만들기 위해 사용
Sliding Window 나 기간 내 상태 추적 등이 있다.
Stateless Transformations
Function name | Purpose | Scala example | Signature of user-supplied function on DStream[T]
|
---|---|---|---|
|
Apply a function to each element in the DStream and return a DStream of the result. |
|
|
|
Apply a function to each element in the DStream and return a DStream of the contents of the iterators returned. |
|
|
|
Return a DStream consisting of only elements that pass the condition passed to filter. |
|
|
|
Change the number of partitions of the DStream. |
|
N/A |
|
Combine values with the same key in each batch. |
|
|
|
Group values with the same key in each batch. |
|
N/A |
위 Operation 들이 전체 Stream 에 적용되는 것처럼 보이지만, 여러 RDD 로 구성된 각 DStream 내부적으로는 각 RDD 별로 구분되어 적용된다. 예를 들면 reduceByKey() 는 전체 기간에 대해서 적용되는 것이 아니라 각 Batch 별로 reduce 를 실행한다.
stateless transformation 은 각 단계별로 여러 개의 DStream 을 병합할 수 있다. 예를 들면 Key/Value DStream 는 RDD 처럼 Join 관련 Transformation 을 지원한다.
cogroup(), join(), leftouterjoin() 등등
DStream 은 RDD 내부에서 직접 동작할 수 있도록 하기 위해 transform() 이라는 좀 더 높은 레벨의 Operator 를 지원한다. transform() 은 DStream 에 적용하기 위한 RDD 사이에 모든 함수를 제공한다. 이 함수는 새로운 Stream 을 만들어 내기 위해 각 Data Batch 내에서 수행할 수 있다.
보통 transform() 함수는 RDD 에 적용한 처리를 재사용하기 위해 사용한다. 예를 들어 extractOutliers() 를 사용했다면, transform() 을 통해 재사용 할 수 있다는 것이다.(예제 참조)
여러 개의 DStream 으로부터 데이터를 병합(combine) 또는 변환(transform) 한다면, StreamingContext.transform 또는 DStream.transformWith(otherStream, func) 을 사용할 수 있다.
Stateful Transformations
Stateful Transformation 은 Fault Tolerance 를 위해 checkpointing 을 활성화 해야 한다.
로컬에서 개발을 진행한다면, hdfs 가 아니라 local 경로를 사용해도 된다.
Windowed Transformcation
windowed operation 은 Batch 주기보다 긴 사간의 Streaming 결과를 처리할 수 있다.
windowed operation 은 두 개의 파라메터를 필요로 하는데, 이는 다음과 같다.
- window duration
- sliding duration
window() 함수를 통해 windowed operation 을 수행할 수 있지만, Spark Streaming 이 제공하는 몇 가지 효율적이고 편한 함수들이 있다.
먼저, reduceByWindow() 나 reduceByKeyAndWindow() 는 각 Window 내에서 좀 더 효율적으로 동작한다. 이런 함수들은 + 처럼 전체 window 에 대해서 하나의 reduce 함수를 갖는다. 그리고 reduce 작업을 점진적으로 수행 가능하도록 특정 형태를 갖는데, 이는 reduce 의 역함수인 -(+와 반대) 도 있다. 이렇게 역함수를 갖고 있으면 좀 더 유용하다.
위 그림처럼 window() 함수를 활용하면 좀 더 효율적으로 operation 을 수행할 수 있다.
Data Counting 을 할 때, DStream 에서는 countByWindow()와 countByValueAndWindow() 를 제공한다.
- countByWindow - 각 window 내에 Element 들의 갯수(Unique Element)
- countByValueAndWindow - 각 window 내에 value 갯수(Unique Value)
UpdateStateByKey Transformations
때로는 DStream 내 여러 Batch 들을 하나의 state 로 관리할 필요가 있을 때 유용하다. updateStateByKey() 함수는 Key/Value 형태의 DStream 에서 상태변수를 갱신할 수 있게 한다.
예를 들어 웹서버 로그에서 UserId 가 Key 인 특정 사이트 접속 이벤트가 있다고 가정하자. UpdateStateByKey() 를 사용하면, 각 사용자가 접근한 최근 10 페이지를 추적할 수 있다. 이 페이지 목록은 상태(State) 정보이고, 각 Event 가 들어올 때마다 갱신된다.
updateStateByKey() 를 사용하기 위해 update(events, oldState) => newState 를 제공하는데, 각 항목은 아래와 같다.
- events - 현재 Batch 에서 들어오는 Event 목록
- oldState - Optional 한 상태 정보이며, Option 형태로 저장된다. 만약 값이 없다면 해당 Key 에 대한 이전상태가 없음을 의미한다.
- newState - 함수의 반환 값이며, Option 형태이다. 상태를 삭제하고자 할 때, 빈 Option 을 반환할 수도 있다.
updateStateByKey() 의 반환값은 RDD(key, state) pair 를 포함하는 DStream 이다.
아래 예제는 로그 메시지에서 HTTP 응답 코드별 count 값을 계산하는 예제이다.
Output Operation
Output Operation 이란 Stream 에서 최종 데이터에 대한 특정 처리를 지칭한다. (예를 들어 DB 에 데이터를 저장하거나, 스크린에 출력해주거나)
이전에도 언급된 print() 함수는 debug output operation 으로 보통 사용된다. 이는 DStream 의 각 Batch 마다 10개 항목을 출력해 준다.
Debugging 을 할 때, 결과를 저장하기 위해서 Output Operation 을 사용하기도 하는데, DStream 에도 save() 같은 함수가 있다.
디렉토리와 suffix(Optional) 을 파라미터로 받는다.
Hadoop OutputFormat 을 사용하기 위해 saveAsHadoopFiles() 를 사용한다. 그러니 Spark Streaming 은 saveAsSequenceFiles() 를 지원하지 않지만, 아래와 같이 SequenceFiles 도 저장할 수 있다.
마지막으로 froeachRDD() 도 일반적인 Output Operation 에 해당된다. transform() 처럼 각 RDD 를 접근할 수 있고, Spark 가 지원하는 모든 Action Operation 을 사용할 수 있다. 예를 들면 MySql 같은 DB 에 저장할 수 있다. Spark 는 saveAs() 같은 함수는 없지만, foreachPartition() 같은 함수로 내용을 저장할 수 있다.
Input Sources
Spark Streaming 은 기본적으로 여러가지 데이터 Source 를 갖고 있다. 몇몇 source 들은 Maven Artifact 에 포함되어 있고, 나머지 source 들은 spark-streaming-kafka 처럼 별도의 Maven Artifact 로 제공된다.
Core Sources
- socket
- file
- akka actors
Additional Source
- Apache Kafka
- Apache Flume
- push-based receiver
receiver 는 Apache Flume 이 데이터를 전달해주는 Sink 처럼 동작 - poll-based receiverreceiver 는 Custom Sink 로부터 데이터를 가져 올 수 있음
- push-based receiver
Multiple Sources and Cluster Sizing
DStream 에 union() 을 이용하여 여러 Source 로 부터 받은 데이터를 합칠 수 있다.
간혹 Multiple Receiver 데이터 통합 성능을 위해 필요할 수 있다.(Single Receiver 를 사용하는 경우 병목현상이 발생할 수도 있다.)
24/7 Operation
Spark Streaming 에서 가장 큰 장점 중 하나는 Fault Tolerance(고장방지) 보장이다. input 데이터가 저장된 뒤 Spark Streaming 은 Worker 나 Driver 가 죽더라도 항상 정확한 결과를 내 놓는다.
24/7 을 수행하려면 좀 특별할 설정이 필요하다. 먼저 HDFS 나 S3 에 checkpointing 설정을 해야한다. 그리고 drvier 프로그램이나 이상한 input 데이터에 대한 별도 처리 코드가 필요하다.
Checkpointing
Checkpointing 은 Spark Streaming 에서 Fault Tolerance(고장방지) 를 위한 주요 메커니즘이나. 이는 데이터 복구를 위해서 주기적으로 HDFS 나 S3 같은 저장소에 데이터를 저장한다. Checkpointing 에는 두 가지 목적이 있는데, 아래와 같다.
- 오류가 발생했을 때, 다시 RDD 를 계산하는 것을 방지한다.
- Driver 에 대한 Fault Tolerance 를 보장한다.
이런 이유로 Checkpointing 을 설정하는 것은 아주 중요하다.
local 모드로 사용할 때에는 Checkpointing 을 위해 local 파일시스템 경로로 설정해도 된다. 그러나 production level 에서는 복제기능을 제공하는 HDFS 나 S3 를 사용해야 한다.
Driver Fault Tolerance
Drvier node 의 Fault Tolerance 를 위해서는 StreamingContext 를 좀 다른 방법으로 생성해야 한다. StreamingContext.getOrCreate() 를 사용하면 된다.
최초 실행시에는 checkpoint dir 이 존재하지 않기 때문에 StreamingContext 를 생성하게 되고, Driver 오류로 죽은 뒤 재실행 될 때에는 checkpoint dir 을 이용하여 생성하게 된다.
대부분의 Cluster Manager 는 Driver 가 죽었을 때 자동적으로 재시작해 주지 않는다. 따라서 monit 같은 툴을 이용하여 모니터링을 하고, 상황에 맞게 재시작을 해 주어야 한다. 이 때 가장 좋은 방법은 Standalone Cluster Manager 에서 지원하는 --supervise 를 사용하는 것이다. 이는 Spark 가 Driver 를 재시작할 수 있게 해준다. 또한 --deploy-mode cluster 옵션을 통해 Driver 가 local 에서 수행되는 것이 아니라 Cluster 내에서 수행될 수 있도록 해야한다.
이런 옵션을 사용할 때, Spark Standalone Master 에 대해서도 Fault Tolerance 를 지원하고 싶을 때가 있다. 이 때는 Zookeeper 를 사용하면 된다.
Worker Fault Tolerance
Worker node 를 위해서는 Spark Streaming 과 Spark 는 동일하다. 외부로부터 받은 모든 데이터를 Worker node 에 복사를 해 두는 것이다.
Receiver Fault Tolerance
Receiver 를 갖고 있는 Worker node 를 위해서는 추가적으로 고려해야 할 사항들이 있다. Spark Streaming 은 죽은 Receiver 를 다른 node 에서 재시작하게 된다. 그러나 이 때는 수신한 모든 데이터를 잃게 된다. Flume 을 예로 들면, 두 종류의 Receiver 에 대해서 중요한 차이점은 데이터 손실 보장에 있다.
- receiver-pull-from-sink model
Spark 는 복제한 항목을 그냥 제거한다. - push-to-receiver model
데이터를 복제하기 전에 receiver 가 죽으면 데이터 손실이 있을 수 있다.
일반적으로 어떤 receiver 든 데이터 손실을 보장하기 위해서는 fault-tolerance properties 를 잘 설정해야 한다.
Receiver 는 두 가지 사항을 보장한다.
- 모든 데이터는 복제되어 있기 때문에 복구가 가능하다.(checkpoint)
- kafka, push-based flume, twitter 같은 source 에서 받은 데이터를 다른 노드에 복사해 둔다. 단, Receiver 가 죽으면 데이터 손실이 있을 수 있다.
Processing Guarantees
Spark Streaming 의 Fault Tolerance 보장 때문에 Spark 는 모든 Transformation 에 대해 exactly-once semantics(정확히 한번만 수행하는 것)을 제공한다. Worker 가 죽더라도 결과는 항상 동일하다.
그러나 변환 결과를 output operation 을 통해 외부에 저장할 때에는 실패가 발생할 경우 여러번 수행될 수 있다. 그래서 시스템 적으로 이런 경우를 처리할 수 있도록 만들어야 한다. 예를 들면 transaction 처리를 할 수 도 있고, 여러 번 갱신되더라도 결과는 동일하게 구조를 가져갈 수 도 있다.(멱등성)
Streaming UI
Spark 는 Streaming UI 를 제공한다. 일반적으로 http://<driver>:4040 으로 접속할 수 있다.
Performance Considerations
Batch and Window Sizes
일반적으로 Batch Size 는 500 ms 로 설정한다. 처음에는 약 10초정도로 크게 설정하고 줄여가면서 적당한 시간을 찾는 것이 좋다.
비슷한 방법으로 Window Operation 에서는 결과를 산출해내는 주기가 성능에 큰 영향을 준다. 만약 무거운 연산을 한다면 이 주기값을 늘려야 한다.
Level of Parallelism
각 Batch 의 처리시간을 줄이는 방법은 병렬 처리 수준을 높이는 것이다. 병렬 처리 수준을 높이는 3가지 방법을 소개한다.
- Receiver 갯수를 늘려라.
- 수신한 데이터를 repartition 하라.
- aggregation 수행시 병렬수준을 높혀라.
Garbage Collection and Memory Usage
Java 의 CMS(Concurerent Mark-Sweep) GC 를 사용함으로서 STW(Stop The World) 시간을 줄일 수 있다.
추가적으로 RDD 를 cache 할 때 직렬화된 형태로 저장한다면 GC 부담을 덜 수 있다. Kryo 직렬화를 사용한다면 좀 더 GC 부담을 줄일 수 있을 것이다.
Spark 는 Memory 제거 방식을 설정할 수 있는데, 기본적으로는 LRU 알고리즘을 적용하지만, spark.cleaner.ttl 속성을 이용하여 특정 기간이 지난 데이터에 대해서는 제거할 수 있도록 설정할 수도 있다.