BigData/Spark

Spark Dataframe 에서 특정 컬럼 타입 변경

Tomining 2023. 10. 27. 20:13

Spark 에서 Schema 를 지정해 Dataframe 을 생성한다면 좋겠지만 csv 등 schemaless 하게 Dataframe 을 생성한 경우 타입을 변경해야 할 일이 있을 수 있다.

 

기본적으로 아래와 같이 캐스팅 하면 된다.

df.withColumn(columnName, col(columnName).cast("type"))​

 

만약 변경해야 하는 컬럼이 여러 개라면?

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

val DEFAULT_COLUMN_TYPES_MAP = 
    Map(
        "SEQ" -> "decimal(9,0)",
        "AMT" -> "decimal(9,0)"
    )

object DataFrameExtensions extends Serializable {
    implicit class DataFrameExtension(df: DataFrame) extends Serializable {
        def applyColumnType(columnTypeMap: Map[String, String] = DEFAULT_COLUMN_TYPES_MAP): DataFrame = {
            var temp = df
            
            for (columnName <- df.columns) {
                val typeName = columnTypeMap.collectFirst {
                    case tp if columnName.contains(tp._1) => tp._2
                }.getOrElse("string")
                
                // println(s"convert $columnName -> $typeName")
                temp = temp.withColumn(columnName, col(columnName).cast(typeName))
                // df.printSchema()
            }
            
            return temp
        }
    }
}

import DataFrameExtensions._

위 예제는 DEFAULT_COLUMN_TYPES_MAP 에 컬럼명 패턴을 등록하고 일괄로 타입을 변경하는 코드다.

만약 특정 컬럼명을 full 로 작성한 경우라면 확장함수 내에 컬럼명 비교 구문을 equals 로 변경하면 된다.

columnName == tp._1

위처럼 구현 후 사용은 아래와 같이 해 줄 수 있다.

df.applyColumnType().printSchema()

 

만약 확장함수가 실행이 되지 않는다면... 확장함수 생성 후 import 로 활성화 해 주었는지 체크하자.

 

Dataframe 을 정의된 스키마로 읽을 수 있는 방법은 간단히 해 볼 수 있다.

참고: Spark실습DataFrame-생성-및-Schema-정의