본문 바로가기
BigData/Spark

Learning Spark Chapter. 5 Loading & Saving Data

by Tomining 2015. 7. 17.
Spark 에서 사용할 수 있는 파일 포멧은 아래와 같다.

  • Text File
  • JSON
  • CSV
  • SequenceFiles
  • Protocol buffers
  • Object files




Text File

텍스트형 파일을 RDD 로 읽으려면 간단히 textFile() 을 사용하면 된다.

JavaRDD<String> lines = sc.textFile("REAMME.md");

만약 여러 파일을 읽어 RDD 로 만들고 싶다면 2가지 방법이 있다.

  • textFile(“디렉토리 경로”) 또는 textFile(“디렉토리/*txt”)
    디렉토리 경로를 전달하거나 wildcard 를 활용할 수 있다.
  • wholeTextFiles()
    PairRDD 로 반환되며, key 는 파일명이 된다.

RDD 를 파일로 저장하려면, saveAsTextFile() 을 간단히 사용할 수 있다.




JSON

TextFile 과 유사하다. JSON 데이터로 파싱하는 단계가 하나 더 있을 뿐이다.
Json Parser 를 만드는 비용이 비싼편인데, 이를 재활용하기 위해서는 mapPartitions() 를 사용할 수 있다.

(아래 예제 참고)

package com.tomining.spark.tutorial.example.ch5;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonInput implements Serializable {
public void execute() {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("JsonInput");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaRDD<String>
lines = sc.textFile("REAMME.json");
JavaRDD<Person>
result = lines.mapPartitions(new ParseJson());
}
}

class ParseJson implements FlatMapFunction<Iterator<String>, Person> {
public Iterable<Person> call(Iterator<String> lines) throws Exception {
List<Person>
people = new ArrayList<Person>();
ObjectMapper
mapper = new ObjectMapper();

while(lines.hasNext()) {
String
line = lines.next();
try {
people.add(mapper.readValue(line, Person.class));
}
catch (Exception e) {
e.printStackTrace();
}
}

return people;
}
}

class Person {
private String name;
private String age;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getAge() {
return age;
}

public void setAge(String age) {
this.age = age;
}
}

Json 유형으로 RDD 를 저장하는 것도 비슷하게 해결할 수 있다.

(아래 샘플 코드 참고)

package com.tomining.spark.tutorial.example.ch5;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonOutput implements Serializable {
public void execute(String outputPath) {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("JsonOutput");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaRDD<String>
lines = sc.textFile("REAMME.json");
JavaRDD<Person>
result = lines.mapPartitions(new ParseJson());

JavaRDD<String>
formatted = result.mapPartitions(new WriteJson());
formatted.saveAsTextFile(outputPath);
}
}

class WriteJson implements FlatMapFunction<Iterator<Person>, String> {
public Iterable<String> call(Iterator<Person> people) throws Exception {
List<String>
text = new ArrayList<String>();
ObjectMapper
mapper = new ObjectMapper();

while (people.hasNext()) {
Person
person = people.next();
text.add(mapper.writeValueAsString(person));
}

return text;
}
}




CSV/TSV

JSON 방식과 유사하다. 단지 Json Parser 가 아닌 CSV Library 가 사용되는 점만 차이가 있다.
책에서는 opencsv 라이브러리를 활용하고 있다.




Sequence Files

Hadoop Map&Reduce 에서 input/output 으로 사용되던 파일 포멧이다.
SequencesFile 은 Writable 인터페이스를 구현한 요소들로 구성되어 있는데,
Hadoop 의 Writable 인터페이스는 java.io.Serializable 을 구현하고 있지 않기 때문에 반드시 map() 을 통해 변환을 해 주어야 한다.

scala/java/Hadoop Writable 타입을 비교해 둔 자료가 책(P. 81)에 있으니 참고하길 바란다.

SequenceFile 은 spark 1.1.0 부터 python 에 대해서도 지원하기 시작하였다.

Spark 는 SequenceFile 을 읽기 위해 별도의 API 를 제공하고 있다.
sequenceFile(path, keyClass, valueClass, minPartitions)

package com.tomining.spark.tutorial.example.ch5;

import java.io.Serializable;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class SequenceFileLoad implements Serializable {
public void execute(String fileName) {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("SequenceFileLoad");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaPairRDD<Text, IntWritable>
input = sc.sequenceFile(fileName, Text.class, IntWritable.class);
JavaPairRDD<String, Integer>
result = input.mapToPair(new ConvertToNativeType());
}
}

class ConvertToNativeType implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) throws Exception {
return new Tuple2(record._1.toString(), record._2.get());
}
}

SequenceFile 로 저장하려면 PairRDD 타입이어야 한다.
RDD.saveAsSequenceFile(outputFile) 을 이용하여 저장할 수 있으며, 이렇게 저장된 파일은 Writable 이 구현된 자료형이 아니다.
따라서 Hadoop Map&Reduce 에서도 사용할 수 있는 SequenceFile 을 위해서는 Writable 자료형으로 변환해 주어야 한다.




Object Files

Object files 은 간단히 말하면 Sequence File 을  Wrapping 해 둔 파일이다. 이로 인해 RDD 를 저장할 때 단지 value 값만 저장할 수 있게 해준다.
Seqeunce File 은 앞에서 설명한 것처럼 org.apache.hadoop.io 에 구현되어 있는 Writable 인터페이스에서 Serialization 되는 반면, Object File 은  java.io.Serializable 를 사용한다.
서로 다른  Serialization 를 사용하기 때문에 동일한 Ojbect 라도 Hadoop output 에 차이가 있을 수 있다.
다른 데이터 유형과는 다르게 Object 파일은 Spark Job 간에 통신시 사용된다. Java Serialization 은 다소 느릴 수 있다.

Object File 을 읽고 저장하는 방법은 다소 간단하다.
저장시에는 saveAsObjectFile(path) 을 읽을 때는 objectFile(path) 를 사용하면 된다.

이런 단점들이 있음에도 불구하고 Object File 을 사용하는 이유는 Arbitrary Object(임시객체?) 로 저장할 때 변환작업이 필요없기 때문이다.

Object File 은 파이썬에서는 지원되지 않지만, 유사한 기능을 하는 Pickle File 이 있다.
saveAsPickleFile(), pickleFile() 을 사용하여 데이터 저장과 내용을 읽을 수 있다.

* Pickle 이란? 파이썬에서 Serialization 을 지원하는 라이브러리이다. 다소 느릴 수 있다.




Hadoop input/output Format

Spark 가 Hadoop 에서 지원하는 Format 과 호환이 된다.
이 때  Hadoop API 중 과거 버전과 신 버전을 모두 지원한다.(단, API 명이 조금 다름)

Hadoop 의 input format 을 읽을 때에는 newAPIHadoopFile(path, inputClass, keyClass, valueClass) 을 사용할 수 있다. 
추가적으로 별도의 Hadoop 설정을 사용하고자 한다면, conf 객체를 전달 할 수도 있다.

textFile 에서 key/value 유형의 데이터를 읽기 위해 KeyValueInputFormat 클래스를 사용할 수 있다.
해당 클래스는 Hadoop 에 포함되어 있기 때문에 별도의 dependency 를 필요로 하지 않는다.

Hadoop 의 output format 을 저장할 때에는 Sequence File 저장하는 방식과 동일하다. 그러나 Java 에서는 PairRDD 에서 saveAsNewAPIHadoopFile() 같은 함수를 지원하지 않기 때문에 별도의 변환 작업이 필요하다.(SequencFile 설명시에도 언급되었던 내용이다.)

package com.tomining.spark.tutorial.example.ch5;

import java.io.Serializable;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class HadoopOutputFormat implements Serializable {
public void execute(List<Tuple2<String, Integer>> input, String fileName) {
SparkConf
conf = new SparkConf().setMaster("local").setAppName("HadoopOutputFormat");
JavaSparkContext
sc = new JavaSparkContext(conf);

JavaPairRDD<String, Integer>
rdd = sc.parallelizePairs(input);
JavaPairRDD<Text, IntWritable>
result = rdd.mapToPair(new ConvertToWritableTypes());

result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class);
}
}

class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) throws Exception {
return new Tuple2(new Text(record._1), new IntWritable(record._2));
}
}




파일 시스템 외에 Data Source들
key/value 방식의 데이터 저장소(Hbase, MongoDB 등)에서 hadoopDataset/saverAsHadoopDataset 과 newAPIHadoopDataset/saveAsNewAPIHadoopDataset 을 사용할 수 있다.




파일 압축

Spark 의 기본적인 데이터 유형인 textFile 이나 SequenceFile 의 경우도 기본적인 자동적으로 압축이 지원된다.

압축 Option 에는 아래와 같은 것이 있다.
  • gzip
    linux 장비에는 기본적으로 설치되어 있으며, 압축 성능은 빠른 편이다.
  • lzo
    각 worker node 에 설치가 필요하며, gzip 보다 빠르다.
  • bzip2
  • zlib
  • Snappy
    상당히 빠른 압축 포멧이나, 현재는 Spark 에서 지원하지 않는다.

상세한 내용은 책 88 페이지를 참고하자.




파일 시스템
아래 3가지 유형의 파일시스템을 사용할 수 있다.
이외에도 Custom Apdater 를 사용한다면 다른 유형의 storage 도 사용 가능할 것이다.
  • Local FS
    각 worker node 에서 접근하는 경로가 모두 같아야 한다.
  • Amazon S3
  • HDFS

Spark SQL
아래 두가지 유형의 데이터를 지원한다.
  • Apache Hive
  • JSON

Database
Spark 에서는 4가지 common connector 를 지원한다.
  • Jdbc connector
    MySQL, Postgres 등 RDBMS 를 지원한다.
    new JdbcRDD(sc, connection, sql, lowerBound, upperBound, numPartitions, mapRow, evidence);
    • connection : DB 연결 function
    • sql : SQL
    • lowerBound/upperBound : 데이터를 가져올 범위
    • numPartitions : 파티션 수
    • mapRow : ResultSet 을 파싱할 Mapper 함수
    • evidence : 최종 결과물에 적용할 함수. 예를 들면 logging
  • Cassandra
    • Java 와 Scala 만 지원한다
    • Cassandra Connector(OpenSource) 를 Dependency 에 추가해야 한다.
  • Hbase
    org.apache.hadoop.hbase.mapreduce.TableInputFormat 을 구현한 Input format 을 통해 Hbase 에 접근 가능하다.
    이 format 은 key/value 타입의 반환한다.(아래 타입 참조)
    Spark에서 Hbase 데이터를 가져오려면 SparkContext.newAPIHadoopRDD() 를 사용하면 된다.
    • key - org.apache.hadoop.hbase.io.ImmutableBytesWritable
    • value - org.apache.hadoop.hbase.client.Result
  • Elasticsearch
    Elasticsearch-Hadoop 을 이용해서 데이터를 읽고, 쓰기가 가능하다.
    Elastic search OutputFormat Connector 는 Spark Wrapper 를 지원하지 않기 때문에 saveAsHadoopDataset() 을 사용하는 것이 아니라 별도의 세팅을 해 주어야 한다.