Apache Spark와 RDD
Apache Spark 코드로 알아보기
개념 간단정리
Apache Spark
대용량의 데이터를 효율적으로 처리하는 빅데이터 분산처리 플랫폼
Spark 프로그램은 SparkSession을 만드는 것부터 시작 - Spark세션을 통해 Spark가 제공하는 다양한 기능을 사용 가능
공식문서 (https://spark.apache.org/docs/latest/cluster-overview.html)
RDD
Apache Spark의 기본적인 자료구조
RDD 연산
RDD 연산의 2가지 종류
- Transformations
- 기존의 데이터에서 새로운 데이터를 만듬
- ex) map, filter 등
- https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
- Actions
- 데이터셋에서 특정한 연산을 수행 후 결과값을 전달
- ex) count, take 등
- https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
코드진행
- 실습환경 체크
# 버전 확인
import pyspark
import numpy as np
import pandas as pd
import matplotlib as mpl
import seaborn as sns
print(pyspark.__version__)
print(np.__version__)
print(pd.__version__)
print(mpl.__version__)
print(sns.__version__)
SparkSession 만들기
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("my1stSpark").getOrCreate()
spark
RDD
# 0부터 10까지 데이터
num_values = range(10)
# RDD 객체 생성
num_values = spark.sparkContext.parallelize(num_values)
# 객체 타입 확인
print(num_values)
# ParalPythonRDD[11] at RDD at PythonRDD.scala:53lelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:274
- RDD Transformation & Action
# 생성한 객체를 세제곱
# RDD Transformation
cubic_values = num_values.map(lambda x : pow(x,3))
# RDD Action
for num in cubic_values.collect():
print(num)
#0
#1
#8
#27
#64
#125
#216
#343
#512
#729
- README.md 파일 분석하기
# 경로설정
file_path = 'data/README.md'
# 객체 생성
fileRDD = spark.sparkContext.textFile(file_path)
# Spark Transformation
# filter : Spark 글자를 포함한 line
fileRDD_filter = fileRDD.filter(lambda line : 'Spark' in line)
print(fileRDD_filter)
# PythonRDD[28] at RDD at PythonRDD.scala:53
# Spark Action
print(fileRDD_filter.count())
# 19
for line in fileRDD_filter.take(4):
print(line)
# # Apache Spark
# Spark is a unified analytics engine for large-scale data processing. It provides
# rich set of higher-level tools including Spark SQL for SQL and DataFrames,
# [![PySpark Coverage](https://codecov.io/gh/apache/spark/branch/master/graph/badge.svg)]
# (https://codecov.io/gh/apache/spark)
PairRDD
- RDD의 요소가 키와 값의 쌍을 이루는 경우
- 이러한 경우를 위한 연산들을 Pair RDD 연산이라고 한다
- 연산 종류 : https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations
- PairRDD생성
# 데이터 생성
data = [('python',10),('javascript',5),('JAVA',20), ('python',10),('R',5),('JAVA',10)]
data
#[('python', 10),
# ('javascript', 5),
# ('JAVA', 20),
# ('python', 10),
# ('R', 5),
# ('JAVA', 10)]
# 데이터를 PairRDD로 변환
regi_lan = spark.sparkContext.parallelize(data)
print(regi_lan)
# ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274
# 데이터 타입체크
print(type(regi_lan))
# <class 'pyspark.rdd.RDD'>
# Action Method로 값 확인해보기
print(regi_lan.collect())
# [('python', 10), ('javascript', 5), ('JAVA', 20), ('python', 10), ('R', 5), ('JAVA', 10)]
- groupbyKey()
# groupbyKey 사용 예시 (sorted : 정렬)
# key값별 갯수
sorted(regi_lan.groupByKey().mapValues(len).collect())
# [('JAVA', 2), ('R', 1), ('javascript', 1), ('python', 2)]
# key값별 데이터
sorted(regi_lan.groupByKey().mapValues(list).collect())
# [('JAVA', [20, 10]), ('R', [5]), ('javascript', [5]), ('python', [10, 10])]
# 객체 생성
group_rdd = regi_lan.groupByKey().collect()
# 객체의 key값별 데이터를 한줄로
for keys, values in group_rdd:
print(keys, "-->", list(values))
# python --> [10, 10]
# javascript --> [5]
# JAVA --> [20, 10]
# R --> [5]
- reduceByKey()
- Key값을 통한 데이터 그룹화
- groupbyKey와 결과는 유사하지만, 내부적으로 동작하는 것이 다르다
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey.html?highlight=reducebykey
# reduceByKey 사용 예시 - Key값별로 계산해보기
# 더하기
regi_lan.reduceByKey(lambda x1, x2 : x1 + x2).collect()
# [('python', 20), ('javascript', 5), ('JAVA', 30), ('R', 5)]
# 뺄셈
regi_lan.reduceByKey(lambda x1, x2 : x1 - x2).collect()
# [('python', 20), ('javascript', 5), ('JAVA', 30), ('R', 5)]
- sortByKey
# sortByKey 사용 예시
# Key별로 정렬
print(regi_lan.sortByKey().collect())
# [('JAVA', 20), ('JAVA', 10), ('R', 5), ('javascript', 5), ('python', 10), ('python', 10)]
# 역순 정렬
print(regi_lan.sortByKey(ascending=False).collect())
# [('python', 10), ('python', 10), ('javascript', 5), ('R', 5), ('JAVA', 20), ('JAVA', 10)]
Leave a comment