BigData/Spark
Learning Spark Chapter. 5 Loading & Saving Data
Tomining
2015. 7. 17. 22:20
Spark 에서 사용할 수 있는 파일 포멧은 아래와 같다.
- Text File
- JSON
- CSV
- SequenceFiles
- Protocol buffers
- Object files
Text File
텍스트형 파일을 RDD 로 읽으려면 간단히 textFile() 을 사용하면 된다.
만약 여러 파일을 읽어 RDD 로 만들고 싶다면 2가지 방법이 있다.
- textFile(“디렉토리 경로”) 또는 textFile(“디렉토리/*txt”)
디렉토리 경로를 전달하거나 wildcard 를 활용할 수 있다. - wholeTextFiles()
PairRDD 로 반환되며, key 는 파일명이 된다.
RDD 를 파일로 저장하려면, saveAsTextFile() 을 간단히 사용할 수 있다.
JSON
TextFile 과 유사하다. JSON 데이터로 파싱하는 단계가 하나 더 있을 뿐이다.
Json Parser 를 만드는 비용이 비싼편인데, 이를 재활용하기 위해서는 mapPartitions() 를 사용할 수 있다.
(아래 예제 참고)
package com.tomining.spark.tutorial.example.ch5;
import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import com.fasterxml.jackson.databind.ObjectMapper; public class JsonInput implements Serializable { public void execute() { SparkConf conf = new SparkConf().setMaster("local").setAppName("JsonInput"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("REAMME.json"); JavaRDD<Person> result = lines.mapPartitions(new ParseJson()); } } class ParseJson implements FlatMapFunction<Iterator<String>, Person> { public Iterable<Person> call(Iterator<String> lines) throws Exception { List<Person> people = new ArrayList<Person>(); ObjectMapper mapper = new ObjectMapper(); while(lines.hasNext()) { String line = lines.next(); try { people.add(mapper.readValue(line, Person.class)); } catch (Exception e) { e.printStackTrace(); } } return people; } } class Person { private String name; private String age; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAge() { return age; } public void setAge(String age) { this.age = age; } }
|
Json 유형으로 RDD 를 저장하는 것도 비슷하게 해결할 수 있다.
(아래 샘플 코드 참고)
package com.tomining.spark.tutorial.example.ch5;
import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import com.fasterxml.jackson.databind.ObjectMapper; public class JsonOutput implements Serializable { public void execute(String outputPath) { SparkConf conf = new SparkConf().setMaster("local").setAppName("JsonOutput"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("REAMME.json"); JavaRDD<Person> result = lines.mapPartitions(new ParseJson()); JavaRDD<String> formatted = result.mapPartitions(new WriteJson()); formatted.saveAsTextFile(outputPath); } } class WriteJson implements FlatMapFunction<Iterator<Person>, String> { public Iterable<String> call(Iterator<Person> people) throws Exception { List<String> text = new ArrayList<String>(); ObjectMapper mapper = new ObjectMapper(); while (people.hasNext()) { Person person = people.next(); text.add(mapper.writeValueAsString(person)); } return text; } }
|
CSV/TSV
JSON 방식과 유사하다. 단지 Json Parser 가 아닌 CSV Library 가 사용되는 점만 차이가 있다.
책에서는 opencsv 라이브러리를 활용하고 있다.
Sequence Files
Hadoop Map&Reduce 에서 input/output 으로 사용되던 파일 포멧이다.
SequencesFile 은 Writable 인터페이스를 구현한 요소들로 구성되어 있는데,
Hadoop 의 Writable 인터페이스는 java.io.Serializable 을 구현하고 있지 않기 때문에 반드시 map() 을 통해 변환을 해 주어야 한다.
scala/java/Hadoop Writable 타입을 비교해 둔 자료가 책(P. 81)에 있으니 참고하길 바란다.
SequenceFile 은 spark 1.1.0 부터 python 에 대해서도 지원하기 시작하였다.
Spark 는 SequenceFile 을 읽기 위해 별도의 API 를 제공하고 있다.
sequenceFile(path, keyClass, valueClass, minPartitions)
package com.tomining.spark.tutorial.example.ch5;
import java.io.Serializable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class SequenceFileLoad implements Serializable { public void execute(String fileName) { SparkConf conf = new SparkConf().setMaster("local").setAppName("SequenceFileLoad"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class, IntWritable.class); JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeType()); } } class ConvertToNativeType implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> { public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) throws Exception { return new Tuple2(record._1.toString(), record._2.get()); } }
|
SequenceFile 로 저장하려면 PairRDD 타입이어야 한다.
RDD.saveAsSequenceFile(outputFile) 을 이용하여 저장할 수 있으며, 이렇게 저장된 파일은 Writable 이 구현된 자료형이 아니다.
따라서 Hadoop Map&Reduce 에서도 사용할 수 있는 SequenceFile 을 위해서는 Writable 자료형으로 변환해 주어야 한다.
Object Files
Object files 은 간단히 말하면 Sequence File 을 Wrapping 해 둔 파일이다. 이로 인해 RDD 를 저장할 때 단지 value 값만 저장할 수 있게 해준다.
Seqeunce File 은 앞에서 설명한 것처럼 org.apache.hadoop.io 에 구현되어 있는 Writable 인터페이스에서 Serialization 되는 반면, Object File 은 java.io.Serializable 를 사용한다.
서로 다른 Serialization 를 사용하기 때문에 동일한 Ojbect 라도 Hadoop output 에 차이가 있을 수 있다.
다른 데이터 유형과는 다르게 Object 파일은 Spark Job 간에 통신시 사용된다. Java Serialization 은 다소 느릴 수 있다.
Object File 을 읽고 저장하는 방법은 다소 간단하다.
저장시에는 saveAsObjectFile(path) 을 읽을 때는 objectFile(path) 를 사용하면 된다.
이런 단점들이 있음에도 불구하고 Object File 을 사용하는 이유는 Arbitrary Object(임시객체?) 로 저장할 때 변환작업이 필요없기 때문이다.
Object File 은 파이썬에서는 지원되지 않지만, 유사한 기능을 하는 Pickle File 이 있다.
saveAsPickleFile(), pickleFile() 을 사용하여 데이터 저장과 내용을 읽을 수 있다.
* Pickle 이란? 파이썬에서 Serialization 을 지원하는 라이브러리이다. 다소 느릴 수 있다.
Hadoop input/output Format
Spark 가 Hadoop 에서 지원하는 Format 과 호환이 된다.
이 때 Hadoop API 중 과거 버전과 신 버전을 모두 지원한다.(단, API 명이 조금 다름)
Hadoop 의 input format 을 읽을 때에는 newAPIHadoopFile(path, inputClass, keyClass, valueClass) 을 사용할 수 있다.
추가적으로 별도의 Hadoop 설정을 사용하고자 한다면, conf 객체를 전달 할 수도 있다.
textFile 에서 key/value 유형의 데이터를 읽기 위해 KeyValueInputFormat 클래스를 사용할 수 있다.
해당 클래스는 Hadoop 에 포함되어 있기 때문에 별도의 dependency 를 필요로 하지 않는다.
Hadoop 의 output format 을 저장할 때에는 Sequence File 저장하는 방식과 동일하다. 그러나 Java 에서는 PairRDD 에서 saveAsNewAPIHadoopFile() 같은 함수를 지원하지 않기 때문에 별도의 변환 작업이 필요하다.(SequencFile 설명시에도 언급되었던 내용이다.)
package com.tomining.spark.tutorial.example.ch5;
import java.io.Serializable; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class HadoopOutputFormat implements Serializable { public void execute(List<Tuple2<String, Integer>> input, String fileName) { SparkConf conf = new SparkConf().setMaster("local").setAppName("HadoopOutputFormat"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input); JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes()); result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class); } } class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> { public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) throws Exception { return new Tuple2(new Text(record._1), new IntWritable(record._2)); } }
|
파일 시스템 외에 Data Source들
key/value 방식의 데이터 저장소(Hbase, MongoDB 등)에서 hadoopDataset/saverAsHadoopDataset 과 newAPIHadoopDataset/saveAsNewAPIHadoopDataset 을 사용할 수 있다.
파일 압축
Spark 의 기본적인 데이터 유형인 textFile 이나 SequenceFile 의 경우도 기본적인 자동적으로 압축이 지원된다.
압축 Option 에는 아래와 같은 것이 있다.
- gzip
linux 장비에는 기본적으로 설치되어 있으며, 압축 성능은 빠른 편이다. - lzo
각 worker node 에 설치가 필요하며, gzip 보다 빠르다. - bzip2
- zlib
- Snappy
상당히 빠른 압축 포멧이나, 현재는 Spark 에서 지원하지 않는다.
상세한 내용은 책 88 페이지를 참고하자.
파일 시스템
아래 3가지 유형의 파일시스템을 사용할 수 있다.
이외에도 Custom Apdater 를 사용한다면 다른 유형의 storage 도 사용 가능할 것이다.
- Local FS
각 worker node 에서 접근하는 경로가 모두 같아야 한다. - Amazon S3
- HDFS
Spark SQL
아래 두가지 유형의 데이터를 지원한다.
- Apache Hive
- JSON
Database
Spark 에서는 4가지 common connector 를 지원한다.
- Jdbc connector
MySQL, Postgres 등 RDBMS 를 지원한다.
new JdbcRDD(sc, connection, sql, lowerBound, upperBound, numPartitions, mapRow, evidence);- connection : DB 연결 function
- sql : SQL
- lowerBound/upperBound : 데이터를 가져올 범위
- numPartitions : 파티션 수
- mapRow : ResultSet 을 파싱할 Mapper 함수
- evidence : 최종 결과물에 적용할 함수. 예를 들면 logging
- Cassandra
- Java 와 Scala 만 지원한다
- Cassandra Connector(OpenSource) 를 Dependency 에 추가해야 한다.
- Hbase
org.apache.hadoop.hbase.mapreduce.TableInputFormat 을 구현한 Input format 을 통해 Hbase 에 접근 가능하다.
이 format 은 key/value 타입의 반환한다.(아래 타입 참조)
Spark에서 Hbase 데이터를 가져오려면 SparkContext.newAPIHadoopRDD() 를 사용하면 된다.- key - org.apache.hadoop.hbase.io.ImmutableBytesWritable
- value - org.apache.hadoop.hbase.client.Result
- Elasticsearch
Elasticsearch-Hadoop 을 이용해서 데이터를 읽고, 쓰기가 가능하다.
Elastic search OutputFormat Connector 는 Spark Wrapper 를 지원하지 않기 때문에 saveAsHadoopDataset() 을 사용하는 것이 아니라 별도의 세팅을 해 주어야 한다.