BigData/Spark
Learning Spark Chapter. 9 Spark SQL
Tomining
2015. 7. 31. 21:43
구조적 데이터(Structured Data)와 반구조적(SemiStructured Data) 를 다룰 수 있는 Spark SQL 과 Spark Interface 를 소개한다.
구조적 데이터란? Schema 를 갖고 있는 데이터를 의미한다. 만약 구조적 데이터를 다룰 때, Spark SQL 을 사용하면 쉽고, 효율적으로 다룰 수 있다.
- 다양한 데이터 유형 처리 가능
- SQL 을 사용하여 쿼리 가능
- RDD 와 SQL Table 을 Join 하는 기능을 포함하여 기존 코드(spark-core)와 통합이 가능
이런 기능들을 제공하기 위해 Spark SQL 은 SchemaRDD 를 사용한다. 이는 Row 객체의 RDD 이며, 각 아이템은 Record 를 의미한다. SchemaRDD 는 기존 RDD 와 유사해 보이지만 내부적으로는 좀 더 효율적인 방법으로 저장되고, Schema 의 이점 또한 활용한다. 그리고 SQL 수행 같은 기존 RDD 에서는 제공되지 않는 Operation 도 제공한다. 이런 SchemaRDD 는 외부 데이터 소스나 조회 결과 또는 일반적은 RDD 로부터 생성이 가능하다.
SchemaRDD 를 이용하여 Spark SQL 이 구조적인 데이터를 어떻게 읽고 조회하는지 알아보자.
Linking with Spark SQL
Spark SQL 은 두 가지 타입이 제공된다.
-
Hive 포함
Hive Table, UDF(User-Defined Function), SerDes(Serialization&Deserialization), HiveQL 사용 가능
Apache Hive 가 꼭 설치되어 있을 필요는 없음. - Hive 미포함
Spark Download 페이지에서 빌드된 바이너리를 다운로드 받는 경우, 이미 Hive 를 포함하고 있는 버전이다. 만약 source code 를 받아서 수동으로 빌드를 하게 된다면, sbt 빌드시 -Phive 옵션을 줘야 한다.
sbt/sbt -Phive assembly
|
Scala 나 Java 에서 사용하려면 Maven Lib 이 필요하다. 아래와 같이 dependency 설정을 하면 된다.
<dependency>
<groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.3.0</version>
</dependency>
|
만약 Hive 를 추가할 수 없다면, spark-sql_2.10 을 추가하면 된다.
Spark SQL 을 사용할 때는, Hive 라이브러리를 사용할 수 있는지 없는지에 따라 두 가지 사용법이 있다.
- HiveContext - Hive 설치 필요없음
- SQLContext - Hive 라이브러리가 필요없음
Spark SQL 과 함께 HiveQL(이하 HQL) 을 사용하면 좋다.
만약 이미 설치된 Hive 가 있다면, Hive 설정파일인 hive-site.xml 을 Spark 설정 디렉토리($SPARK_HOME/conf)로 복사해야 한다. Hive 가 설치되지 않았더라도 Spark 는 그대로 수행된다.
Using Spark SQL in Applications
Spark SQL 을 사용하려면 HiveContext 를 생성해야 한다. HiveContext 는 Spark SQL 데이터를 조회 또는 연산되기 위해서 몇 가지 편리한 함수들을 제공하고 있다.
HiveContext 를 사용하면 일반적인 RDD 에서 map() 함수처럼, 구조적 데이터인 SchemaRDD 를 생성하고 SQL Operation 을 수행할 수 있다.
Initializing Spark SQL
Spark SQL 을 시작하려면 몇 가지 Import 가 필요하다.
Import 가 되었다면, HiveContext 를 생성해 보자.
이렇게 하면 데이터를 읽거나 조회할 준비가 되었다.
Basic Query Example
HiveContext 나 SQLContext 에서 제공되는 sql() 함수를 통해서 쿼리를 수행해 볼 수 있다.
아래 예제는 JSON 파일을 읽어서 tweets 라는 임시 테이블에 저장한 뒤 쿼리를 수행하고 있다.
SchemaRDDs
데이터를 읽거나 쿼리를 수행한 뒤 결과는 SchemaRDD 로 반환되는데, SchemaRDD 는 기존 RDBMS 에서의 Table 과 유사하다.
SchemaRDD 는 컬럼 타입정보를 포함하여, Row 형태의 RDD 라고 할 수 있다. Row 객체는 Primitive Type 의 객체의 Wrapper Class 이다.
(Spark 상위버전에서는 SchemaRDD 가 DataFrame 으로 변경될 수 있다.)
SchemaRDD 는 일반적인 RDD 타입으로 기존 RDD 에 제공되는 map() 이나 filter() 같은 Operation 은 모두 사용할 수 있다. SchemaRDD 에서 가장 중요한 특징은 HiveContext.sql 이나 SparkContext.sql 을 통해서 임시 테이블로 등록이 가능하다는 점이다.(registerTempTable() 도 활용할 수 있다.)
SchemaRDD 에서 사용되는 자료유형은 아래와 같다.
Spark SQL/HiveQL type | Scala type | Java type | Python |
---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
마지막에 언급된 Structure 는 Spark SQL 의 다른 Row 가 될 수 있다. 모든 타입은 중첩해서 사용할 수 있는데, 예를 들면 Structure 배열이나, Structure 로 구성된 Map 을 들 수 있다.
Working with Row objects
SchemaRDD 내에 있는 Row 객체는 간단히 말하면 고정된 길이의 Field Arrays 라고 할 수 있다. Row 객체에는 각 field 를 접근하기 위해 getter 함수를 제공힌다.
- get()
Object 타입으로 반환된다. - getType()
각 자료형으로 반환된다.
ex) getString(0)
Python 에서는 타입변환이 필요하지 않다. 그냥 row[i] 나 row.column_name 방식으로 접근할 수 있다.
Caching
Spark SQL 에서의 Caching 은 조금 다르게 동작한다. 이미 각 컬럼의 데이터 유형을 알기 때문에, 좀 더 효율적으로 데이터를 저장할 수 있다.
hiveCtx.cacheTable(“tableName”) 메소드를 통해서 caching 을 할 수 있다. Caching 된 테이블은 Drive Program 이 수행되는 동안에만 유지가 된다. 그래서 종료된 뒤에는 다시 사용하려면 다시 Caching 을 해야한다. 따라서 동일한 데이터를 가지고 다수의 작업을 수행하거나 쿼리를 수행할 때 Caching 을 사용한다.
또한 HiveQL/SQL 을 사용할 수 있는데, Cache 또는 UnCache 를 할 때는 아래와 같이 할 수 있다.
- CACHE TABLE tableName
- UNCACHE TABLE tableName
Cache된 SchemaRDD 는 기존 RDD 와 동일하게 Spark Application UI 에서 확인할 수 있다.
Loading and Saving Data
Spark SQL 은 Hive Table, JSON, Parquet file 등 을 지원한다. 쿼리를 수행하거나 일부 칼럼만 선택하고자 할 때, Spark SQL 은 SparkContext.hadoopFile 처럼 모든 데이터를 스캔하는 것이 아니라 필요한 일부 컬럼만 스캔한다.
일반 RDD 에서 Schema 만 설정해 줌으로서 SchemaRDD 로 변환할 수 있는데, 이는 Java나 Python 데이터이더라도 쉽게 SQL 을 수행할 수 있게 한다. 그리고 많은 양의 데이터를 한 번에 계산할 수도 있고, 서로 다른 저장소에서 읽은 SchemaRDD 를 Join 할 수도 있다.
Apache Hive
SerDes, Text file, RCFile, ORC, Parquet, Avro 그리고 Protocol Buffer 같은 Hive 가 지원하는 데이터 유형은 모두 Spark가 지원한다.
이미 설치된 Hive 에 연결하기 위해서는 hive-site.xml 파일을 $SPARK_HOME/conf/ 하위에 복사해 주어야 한다. 만약 단지 조회만 한다면 hive-site.xml 을 설정하지 않고 local Hive metastore 를 사용하게 된다.
Parquet
Parquet 는 컬럼 기반의 데이터 저장 포멧이다. 이는 Spark SQL 의 모든 데이터 유형을 지원한다. Spark SQL 은 Parquet File 을 바로 읽을 수 있는 Method 를 지원한다.
Parquet file 을 Spark SQL 의 임시 테이블로도 등록할 수 있다.
저장도 가능하다.
JSON
동일한 포멧을 갖는 JSON 파일이 있다면, Spark 에서 사용가능하다.
JSON 파일 내용을 읽으려면 jsonFile() 함수를 사용핳면 된다. 읽은 JSON 파일의 schema 를 확인하려면 SchemaRDD.printSchema() 를 사용하면 확인 할 수있다.
Tweet 메시지의 Schema 를 확인해 보자.
Spark SQL 을 사용할 때 중첩된 field 접근하려면 .(dot) 을 사용하면 된다.
ex) topLevel.nextLevel
From RDDs
SchemaRDD 는 기존 RDD 로 부터 생성할 수 있다.
Python 에서는 Row객체의 RDD 를 생성하고, inferSchema() 를 호출한다.
Java 의 경우, getter/setter 를 각 속성별로 갖고 있고, 직렬화(Serializalbe) 이 구현되어 있는 클래스 RDD 에서 applySchema() 함수를 통해 SchemaRDD 로 변환할 수 있다.
JDBC/ODBC Server
Spark 는 BI(Business Intelligence) 사용할 때 유용한 JDBC connection 도 지원한다. JDBC Server 는 여러 Client 들이 사용할 수 있도록 Standalone Driver Program 처럼 수행된다. Spark SQL JDBC Server 는 Hive 에서 지원하는 HiveServer2 와 연동된다. 이는 Thrift Server 인데, Thrift protocol 을 사용한다.
sbin/start-thriftserver.sh
sbin/stop-thriftserver.sh
자세한 설정은 책 175 페이지를 참고하자.
많은 외부 툴들은 Spark SQL 을 사용할 때, ODBC Driver 를 사용한다. Spark SQL ODBC Driver 는 Simba 를 이용하여 만들어졌고, 여러 Spark vendor 에서 다운로드 받을 수 있다. 보통 Microstrategy 나 Tableau 같은 BI 둘에서 많이 사용된다.
Working with Beeline
Beeline client 를 이용하여 HiveQL 을 사용할 수 있다. HiveQL 에 대한 내용은 여기서 언급하지 않는다. 다만 몇가지 Operation 에 대해서 설명한다.
1. Data 를 읽어서 Table 을 생성
Hive 는 ,(comma) 로 구분된 CSV 같은 파일로 된 데이터를 로딩할 수 있도록 지원한다.
2. 테이블 목록 조회 및 Schema 확인
만약 Table 을 Caching 하고자 한다면, CACHE TABLE tableName 을 사용하면 된다. 후에 Cache 에서 해제할 땐, UNCACHE TABLE tableName 을 사용하자.
3. 실행계획 확인
Long-Lived Tables and Queries
Spark SQL JDBC Server 를 사용하는 이점 중 하나는 table 을 caching 하여 여러 프로그램에서 공유할 수 있는 것이다. 이는 JDBC Thrift Server 가 단일 Driver Program 이기 때문이다. 이를 사용하기 위해서는 table 을 등록하고, CACHE 명령어를 사용하기만 하면 된다.
User-Defined Functions
SQL 에서 사용할 수 있는 사용자 정의 함수(이하 UDF)를 Python/Scala/Java 모두 지원한다. Spark SQL 은 UDF 와 Hive UDF 모두 지원한다.
Spark SQL UDFs
Spark SQL 은 UDF 를 쉽게 등록할 수 있도록 내장 함수를 제공한다. Scala 나 Python 에서는 native function 이나 lambda 표현을 사용할 수 있고, Java 에서는 UDF class 를 상속해서 구현해야 한다. UDF 는 다양한 타입을 갖고 있다.
Python 이나 Java 에서는 SchemaRDD 유형 중 하나를 반환한다. Java 의 경우는 이 타입을 org.apache.spark.sql.api.java.DataType 에서 확인할 수 있고, Python 은 DataType 을 import 해야 한다.
Hive UDFs
Spark SQL 은 표준 Hive UDF 를 사용할 수 있다. 만약 UDF 를 직접 만들고자 한다면, UDF 클래스가 포함된 Jar 를 Spark Application 수행시 포함될 수 있도록 --jar 로 추가해 주어야 한다. 또한 Hive UDF 를 사용하기 위해서는 SparkContext 가 아니라 HiveContext 를 사용해야 한다.
hiveCtx.sql(“CREATE TEMPORARY FUNCTION name AS class.function”)
Spark SQL Performance
Spark SQL 에서는 Caching 된 데이터를 사용할 때, 메모리상의 컬럼형 데이터를 사용한다. 이는 Caching 할 때 적은 공간을 사용할 뿐만 아니라 데이터 일부만 읽고자 할 때, Spark SQL 은 일부의 데이터(field) 만 읽는다.
Spark SQL 은 Oracle 에서 지원되는 Predicate Push-Down 이 지원된다. Spark 에서 특정 row 만 읽고자 하더라도 기본적으로는 전체 데이터를 읽고 filter 를 수행한다. 하지만 Spark SQL 에서는 키 범위의 일부 데이터만 추출할 수 있고, 결과를 갖고 올 때 아주 작은 데이터만 읽어서 처리한다.
Performance Tuning Option
Spark SQL 에서는 성능 개선을 위해 많은 Tuning Option 을 제공한다. (아래 표 참고)
JDBC Connector 나 Beeline shell 을 사용할 때, set command 를 통해서 옵션들을 설정한다.
기존에는 Spark SQL 에서 아래와 같이 spark.sql.codegen 옵션을 설정하기도 하였다.
몇 가지 옵션들은 설정할 때 신중해야 하는데, 두 가지를 예를 들어 설명한다.
- spark.sql.codegen
해당 속성이 true 라면 SQL 을 매번 Java Bytecode 로 변환하여 수행한다. 쿼리가 아주 길거나 자주 수행되는 쿼리에 대해서는 부분적으로 빠르지만 짧은 쿼리나 ad-hoc 쿼리 같은 경우에는 매번 SQL 을 변환해야 하기 때문에 overhead 가 있을 수 있다. - spark.sql.inMemoryColumnarStorage.batchSize
기본값은 1000으로 설정되어 있고, 각 Batch 마다 압축을 한다. Batch Size 가 작은 경우 압축할 용량이 작지만, 아주 큰 경우에는 메모리 상에서 압축을 하기에는 너무 클 수 있기 때문에 문제가 될 수 있다. 만약 Row 가 아주 크다면, batch size 를 줄여서 OOM 이 발생하지 않도록 해야한다. 보통은 기본 값이면 적당하다.