[Spark] Spark의 매우 도움되는 API 친구들
Spark 위에서 우리가 활용할 수 있는 녀석들은 RDD, Spark SQL, DataFrame, DataSet
스파크가 처음 등장했을 때 분산처리를 위해 등장한 것이 RDD. 기본적인 동작은 되지만, 최적화도 직접 해줘야 하고, 암튼 오래된 냄새가 많이 남.
이후 Catalyst Optimizer라는 최적화 엔진이 등장하면서, 이 엔진의 최적화 대상이 아닌 RDD는 자연스레 사장되고, 그 대상인 Spark SQL, DataFrame, DataSet 이 각광 받음.
지금은 그 중에서도, 좀 쉬운 것들은 Spark SQL, 좀 어려운 건 DataFrame API로 어지간하면 다 처리하는 방식.
다만 이제 아무래도 DataFrame의 최적화가 테이블 형식의 데이터에만 적용되므로, 반정형, 비정형 데이터를 다룰 때는 가끔 RDD를 쓰기도 하고, 또 low-level을 다뤄야할 때 어쩔 수 없이 쓰기는 하지만 어쨌든 정말 특수한 경우가 아니고서야 그냥 RDD는 안 쓴다고 봐도 무방함.
Catalyst Optimzer가 어떻게 작동할까?
한마디로, 작성한 SQL/DataFrame 쿼리를 내부적으로 분석하고 최적화하여 빠르게 실행가능하게 변환하는 역할.
그 과정은,
1 Logical Optimization
2 Physical Optimization
3 코드 생성 최적화
그 결과를 체크해보려면, explain 함수를 써보면 됨.
Spark의 데이터 원천
데이터 원천으로 사용될만한 여지가 있는 것들은 다음과 같음.
JDBC, NoSQL, Cloud DWH, Stream 처리 원천 등.
이 각각으로부터 데이터를 가져오는 것은 크게 두가지 방법이 있음.
1 API를 활용해서 가져오기 (아래 그림의 주황색) - 보통 실시간 streaming 처리를 위함
2 store로 끌고 와서 우선적으로 저장하고 거기서 가져와서 활용하기 (아래 그림의 파란색) - batch를 위함.
Batch를 위한 데이터 끌어오기를 store를 활용하는 이유가 있음.
1 기본적으로 각 batch를 module 단위로 끊어서 생각하기 때문에 논리적으로 옳음.
2 Load Balance의 측면에서도 이게 맞고
3 보안과 유연성 측면에서도 이것이 훨씬 나음.
보통 이럴 때 store로 쓰이는 것은 HDFS, Amazon S3, Azure Blob, Google Cloud 등이 있음.
데이터 읽기 API - DataFrameReader API
데이터 원천에서 값을 가져올 때, 보통 csv, json, parquet 등의 파일을 많이 씀.
전에 다뤘듯, spark.read를 활용해서 값을 읽을 수 있음. 물론 shortcut 쓸 수 있겠지만 아래의 기본 틀을 지키는 것을 추천.
spark.read.format("csv").option("header", "true").schema(mySchema).load()
CSV 파일을 가져올 때, dataframe이 스키마(그 중에서도 데이터 유형)를 추론하게 할 수 있는데, 그게 잘 작용 안되는 경우가 있다. 특히나 Date. 이건 보통 string으로 받는 경우가 많음.
JSON은 infer schema를 안 시켜도 default로 infer를 하기 때문에 그 옵션을 넣을 필요는 없음. 그러나 이것도 date를 잘못 추론하곤 함.
Parquet은 infer schema를 기본으로 역시나 가지고 있고, 이건 date를 정확히 함. 그래서 결론은 뭐냐면, 앞으로는 무조건 이 중에 parquet를 쓰는 것이 좋다는 것.
이렇게 말고 explicit 하게 schema를 정해줄 수도 있다. 코딩으로 하거나, 쿼리로 하거나. 쿼리가 더 좋음 결과적으로.
1 프로그램 코드를 활용해서. 이렇게 하면 run을 돌렸을 때 타입에러가 뜨면 에러 처리를 해 줘야 함.
그 방법은 아래의 처리임.
.option("mode", "FAILFAST")
참고로, 기본으로는 spark data frame reader는 특정한 형태의 date만 받아들인다. 그래서 아래 꺼 시전해야함.
.option("dateFormat", "M/d/y")
2 DDL을 활용해서 - 참고로 이게 훨씬 쉬움. 걍 SQL 대로 써주면 그만임.
이렇게 만든 schema는 schema() 옵션에 넣어줄 수 있는 것임.
데이터 쓰기 API - DataFrameWriter API
왼쪽과 같은 틀이고, 활용한다면 오른쪽처럼 하면 됨. 디테일이 있지만, 프로젝트 할때 가서 깊게 공부하면 됨.
일반적으로 Sink라는 걸 활용함.
※ 이 일련의 과정에서 partition을 어떻게 하고, execution을 어떻게 하는 등의 디테일에 신경 쓰고, 더 깊이 공부할 수 있어야겠다. 매우 중요한 것 같음. 관련해서 앱 단에서 지정할 수 있는 repartition, partition_by 등의 함수부터 해서 모두 커버해보자. bucket 이라는 건, partition 시에 key로 나누기는 하지만, key의 개수만큼 많은 파일은 필요없을 때, 파일의 수를 한정하기 위해 활용하는 개념.
참고로 일반적으로 파일 하나는 500MB ~ n GB가 적당하다고 함. 이것은 아래 옵션으로 처리 가능.
.option("maxRecordsPerFile", 10000)
Spark DB and Tables
기본적으로 SQL을 쓴다고 하면 DB table, DataFrame을 쓴다고 하면 view를 쓰게 됨.
이때 table에 대해서는 1 managed table과 2 unmanaged table 이렇게 두가지가 존재함.
결과적으로는 managed table이 bucketing 등의 여러 기능을 지원하기 때문에 더 자주 활용됨.
managed table은 1 create table (해당 table의 정보를 metadata에 저장), 2 save table 이렇게 두 가지가 자동으로 실행됨.
save table은 spark.sql.warehouse.dir 폴더에 자동으로 무조건 저장된다고 보면 됨.
unmanaged table은 1만 하고, 2는 안 함. 오히려 아래 그림처럼, LOCATION을 명시해야 함. 그 부분에서 자유를 주는 것임. 그러나 잊지 말고 명시해줘야 함.
그럼 왜 dataframe으로 저장하는 것이 아니라 spark table (보통 managed) 로 저장을 해야할까?
재사용성 때문임. dataframe은 결국 dataframe reader로 다시 읽어야하지만, spark table로 저장하게 되면 tableau 등의 외부 툴들도 이 데이터에 접근할 수 있기 때문에 유용함.