Apache Spark와 RDD

Apache Spark 코드로 알아보기


개념 간단정리


Apache Spark

대용량의 데이터를 효율적으로 처리하는 빅데이터 분산처리 플랫폼

Spark 프로그램은 SparkSession을 만드는 것부터 시작 - Spark세션을 통해 Spark가 제공하는 다양한 기능을 사용 가능

공식문서 (https://spark.apache.org/docs/latest/cluster-overview.html)


RDD

Apache Spark의 기본적인 자료구조


RDD 연산

RDD 연산의 2가지 종류

  1. Transformations
  2. Actions

코드진행


  • 실습환경 체크
# 버전 확인
import pyspark
import numpy as np
import pandas as pd
import matplotlib as mpl
import seaborn as sns

print(pyspark.__version__)
print(np.__version__)
print(pd.__version__)
print(mpl.__version__)
print(sns.__version__)


SparkSession 만들기

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName("my1stSpark").getOrCreate()
spark

1


RDD


# 0부터 10까지 데이터
num_values = range(10)

# RDD 객체 생성
num_values = spark.sparkContext.parallelize(num_values)

# 객체 타입 확인
print(num_values)
# ParalPythonRDD[11] at RDD at PythonRDD.scala:53lelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:274


  • RDD Transformation & Action
# 생성한 객체를 세제곱

# RDD Transformation 
cubic_values = num_values.map(lambda x : pow(x,3))

# RDD Action
for num in cubic_values.collect():
    print(num)
#0
#1
#8
#27
#64
#125
#216
#343
#512
#729


  • README.md 파일 분석하기
# 경로설정
file_path = 'data/README.md'

# 객체 생성
fileRDD = spark.sparkContext.textFile(file_path)

# Spark Transformation
# filter : Spark 글자를 포함한 line
fileRDD_filter = fileRDD.filter(lambda line : 'Spark' in line)
print(fileRDD_filter)
# PythonRDD[28] at RDD at PythonRDD.scala:53

# Spark Action
print(fileRDD_filter.count())
# 19

for line in fileRDD_filter.take(4):
    print(line)
# # Apache Spark
# Spark is a unified analytics engine for large-scale data processing. It provides
# rich set of higher-level tools including Spark SQL for SQL and DataFrames,
# [![PySpark Coverage](https://codecov.io/gh/apache/spark/branch/master/graph/badge.svg)]
# (https://codecov.io/gh/apache/spark)


PairRDD


  • PairRDD생성
# 데이터 생성
data = [('python',10),('javascript',5),('JAVA',20), ('python',10),('R',5),('JAVA',10)]
data
#[('python', 10),
# ('javascript', 5),
# ('JAVA', 20),
# ('python', 10),
# ('R', 5),
# ('JAVA', 10)]

# 데이터를 PairRDD로 변환
regi_lan = spark.sparkContext.parallelize(data)
print(regi_lan)
# ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274

# 데이터 타입체크
print(type(regi_lan))
# <class 'pyspark.rdd.RDD'>

# Action Method로 값 확인해보기
print(regi_lan.collect())
# [('python', 10), ('javascript', 5), ('JAVA', 20), ('python', 10), ('R', 5), ('JAVA', 10)]


# groupbyKey 사용 예시 (sorted : 정렬)

# key값별 갯수
sorted(regi_lan.groupByKey().mapValues(len).collect())
# [('JAVA', 2), ('R', 1), ('javascript', 1), ('python', 2)]

# key값별 데이터
sorted(regi_lan.groupByKey().mapValues(list).collect())
# [('JAVA', [20, 10]), ('R', [5]), ('javascript', [5]), ('python', [10, 10])]

# 객체 생성
group_rdd = regi_lan.groupByKey().collect()

# 객체의 key값별 데이터를 한줄로
for keys, values in group_rdd:
    print(keys, "-->", list(values))
# python --> [10, 10]
# javascript --> [5]
# JAVA --> [20, 10]
# R --> [5]


# reduceByKey 사용 예시 - Key값별로 계산해보기

# 더하기
regi_lan.reduceByKey(lambda x1, x2 : x1 + x2).collect()
# [('python', 20), ('javascript', 5), ('JAVA', 30), ('R', 5)]

# 뺄셈
regi_lan.reduceByKey(lambda x1, x2 : x1 - x2).collect()
# [('python', 20), ('javascript', 5), ('JAVA', 30), ('R', 5)]


# sortByKey 사용 예시

# Key별로 정렬
print(regi_lan.sortByKey().collect())
# [('JAVA', 20), ('JAVA', 10), ('R', 5), ('javascript', 5), ('python', 10), ('python', 10)]

# 역순 정렬
print(regi_lan.sortByKey(ascending=False).collect())
# [('python', 10), ('python', 10), ('javascript', 5), ('R', 5), ('JAVA', 20), ('JAVA', 10)]

Categories:

Updated:

Leave a comment