기본적으로 spark는 데이터를 처리하기 위한 툴인만큼, 데이터 처리는 가장 핵심적인 요소. spark에서의 데이터 처리는 위의 항목들 정도로 정리할 수 있음.
Play with Dataframe Row
1 수동으로 row 및 dataframe 만들기
예를 들어, 날짜 칸이 모두 date이 아닌 string 형태로 되어있는 녀석에 대한 conversion을 하는 함수를 만들때 쓰임.
2 driver에 dataframe row 모으기
위의 과정을 유닛 테스트로 발전시키고 싶다고 치자. 이때, 기본적으로 dataframe은 병렬화를 위해 많은 노드들에 분산 저장되어있기 때문에, 이따금씩 관련한 전체 데이터를 가지고 뭔가 처리가 필요할 때가 있다. 이때는 각 노드의 관련 데이터들을 모두 뭉쳐야 하기 때문에 에너지도 많이 쓰이기 때문에, 어지간하면 안 쓰는 게 좋다고는 한다. 어쨌든 필요하다고 하면, collect를 쓰면 모든 로컬 (드라이버 노드) 에서 받아서 python의 list의 형태로 쓸 수가 있다.
3 spark에서 개별 row 처리하기
예를 들어 schema가 없거나 없는 거나 다름없는 반정형, 비정형의 경우 printSchema 하면 string 끝. 이렇게 나옴.
이런 경우는 대부분의 high-level 처리가 어려움.
아래와 같이 regex를 활용해서, 컬럼별로 나눠주고, regexp_extract 써서 필요한 컬럼만 꺼내주면 됨.
Play with Dataframe Columns
위의 row 3번에서 본것도 실은 column에 대한 이야기 같긴 한데, 암튼 column에 접근하는 것은 2가지 방법이 있다.
둘 중에 편한 거 쓰면 되고, 사실 둘을 혼합해서 써도 됨.
1 column string
airlinesDF.select("Origin", "Dest", "Distance").show(10)
2 column object
아주 여러가지가 있음. 예를 들어 아래와 같이 3개의 방식이 있음.
airlinesDF.select(column("Origin"), col("Dest"), airlinesDF.Distance).show(10)
3 column에다가 SQL 써버리기 ★ - 많이 쓰임
airlinesDF.select(",,,", expr("to_date(concat(Year,Month,Day), 'yyyyMMdd') as FlightDate))
User Defined Functions
우리가 만든 function을 driver에서도 활용할 수 있게 하기 위해서는, function을 정의한 후 이를 드라이버에 저장해야 함.
1 column object expression
아래와 같은 방식으로, function을 만들고 나서는 udf라는 라이브러리를 활용해서 이를 드라이버에 등록하는 절차를 거쳐야 함.
parse_gender_udf = udf(parse_gender, StringType())
survey_df2 = survey_df.withColumn("Gender", parse_gender_udf("Gender"))
2 sql expression
이건 sql function으로 저장하는 방식임. sql 식이기 때문에 catalog에 저장하는 것.
spark.udf.register("parse_gender_udf", parse_gender, StringType())
MISC 기능들에 대하여 - 플젝을 하면서 필요하면 듣기
'데이터 엔지니어링' 카테고리의 다른 글
[Spark] 캡스톤 프로젝트🚧: from HIVE to Kafka (0) | 2025.02.23 |
---|---|
[Spark] A급 💯 데이터 엔지니어를 위한 Aggregations & Joins (0) | 2025.02.22 |
[Spark] Spark의 매우 도움되는 API 친구들 (0) | 2025.02.20 |
[Spark] 본격적인 프로그래밍 on spark 🧑💻 (0) | 2025.02.19 |
[Spark] 스파크 실행 모델과 구조 (0) | 2025.02.18 |