Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Contents

Spark 3의 새로운 기능들

Spark의 최신 버전은 "Spark 3"다. Spark의 역사를 대략 정리했다.

 Spark 역사

Spark는 2009년 UC Berkeley AMPLab에서 빅 데이터 분석을 위한 시스템 개발을 목표로 시작했다. 2014년 Spark 1.0 이 릴리즈 되고 같은해 탑-레벨 아파치프로젝트가 된다. 아파치 재단의 주요 프로젝트들을 둘러보자. 기술을 선택하는데 큰 도움이 될 것이다.

2018년 Spark에서 분산 ML 프레임워크 기능을 제공하기 위한 Project Hydrogen가 활동을 시작한다.

2019년 Spark 3 preview 버전이 릴리즈 된다. 올해(2020년)에는 Spark3가 정식 릴리즈되지 않을까 싶다. Spark 3에서 달라지는 점들을 정리했다.

속도

Spark 2에 비해서 빠르다. 기능에 따라 다르겠으나 17배 이상 빠른 벤치마크 결과도 있다.

언어지원

Python 3와 Scala 2.12을 주요 버전으로 사용한다. Python 2는 더 이상 사용하지 않는다.

Adaptive execution of Spark SQL

Spark SQL은 Spark 의 중요 기능 중 하나다. 3.0에서는 Adaptive execution of Spark SQL을 지원한다.

Dynamic Partition Pruning

Dynamic Partition Pruning(DPP, 동적 파티션 정리) 기능을 지원한다. Spark 플랫폼에서 데이터웨어 하우스 기능이 중요해지고 있다. DPP는 차원 테이블(dimension table)에 필터 세트를 팩트 테이블(fact table)에 직접 적용해서 불필요한 파티션 스캔을 건너뛰게 하는 방법으로 성능을 향상시킨다.

향상된 Deep Learning 지원

물론 3.0 버전 이전의 Spark도 딥 러닝을 지원했다. 그러나 Spark MLlib는 딥 러닝에 중점을 두지 않았다. 딥러닝 알고리즘을 제공하지 않고 있으며 특히 이미지 처리와 관련된 많은 것들을 제공하지 않고 있다. TensorFlowOnSpark, MMLSpark 등을 이용해서 가능은 했지만 부드럽게 통합되지는 않았다.

Spark 3.0에서는 딥 러닝 기능이 강화됐다. Nvidia, AMD, Intel 등 다양한 GPU에 대한 지원을 추가하고 동시에 여러 유형의 GPU를 사용 할 수 있게 됐다. 또한 Vectorized UDF 가속을 위해서 GPU를 사용 할 수 있다. GPU 노드가 포함된 kubernetes 클러스터를 구성한다면, 유연하게 GPU를 사용 할 수 있다.

Kubernetes와의 통합

Spark 2.0 도 Kubernetes위에서 운용 할 수 있었지만 성숙하지는 않았으며, YARN 클러스터 관리자와 비교해서 제품레벨에서 사용하기에 어려웠다. Spark 3.0은 Spark on Kubernetes를 위한 새로운 "shuffle servicer"를 제공한다. 이를 이용해서 동적 확장과 축소가 가능하다.

Graph

그래프(Graph)는 추천엔진 및 사기 탐지등을 포함한 여러 분야에서 사용 할 수 있다. Spark 3.0은 그래프처리를 위한 SparkGraph라는 완전한 새로운 모듈이 도입되었다. Neo4J가 많은 기여를 한 모듈로 이전에는 Cypher for Spark로 불렸으며, Spark 구성안에서는 SparkGraph로 부르고 있다.

ACID Transaction with Delta Lake

Delta Lake는 Apache Spark 3.0에 ACID 트랜잭션을 제공하기 위한 오픈소스 스토리지 계층이다. 데이터 레이크에 있는 데이터가 여러 사용자에 의해서 동시에 수정될 때 나타나는 문제를 해결 할 수 있다.

Spark 소개

인터넷 서비스를 제공하는 기업은 서비스에 접근하는 사용자의 모든 것들을 저장하려고 한다. 적절한 시점에 적절한 정보를 유저에게 제공해서, 유저로 하여금 더 많은 돈을 지불하도록 해서 더 많은 수익을 얻기 위해서는 유저의 활동을 분석해야 하기 때문이다. 인터넷 쇼핑몰 기업이라면, 유저가 방문한 상품, 구매한 상품, 방문 시간, 물류의 상태를 기록하고 모니터링 해야 한다. 이러한 정보들은 웹로그(weblogs), 위치데이터, 개인정보(성별, 나이) 형태로 쌓이게 된다. 이러한 데이터는 매우 커질 수 있으므로 기업은 이를 분석하기 위한 시스템을 필요로 한다.

Spark는 여러 대의 컴퓨터로 구성된 클러스터링 시스템을 만들어서 데이터를 분석한다. 거대한 데이터를 여러 개의 파일로 쪼개고 이를 여러 대의 컴퓨터에 분산해서 처리하도록 명령하고 그 결과를 통합하는 방식이다.

 Spark의 데이터 처리 방식

Spark 플레임워크는 "Spark 애플리케이션"을 여러 개의 컴퓨터 노드에서 분산실행한다. Spark 애플리케이션은 Driver ProgramExecuter 프로세스로 구성이 된다. 클러스터는 "Master Node"와 "Slave Node"로 구성된다. Master Node는 Cluster Manager Spark 클러스터에 있는 하나 이상의 Salve Node들을 관리한다. 예를 들어 하둡(Hadoop)을 사용한다면 YARN을 이용해서 클러스터를 관리하게 될 것이다.

Cluster Manager는 클러스터를 확장(새로운 노드의 추가)과 축소(노드를 제거)하고, 모니터링 하면서 노드에 문제가 생겼을 때, 클러스터 실패방지 및 클러스터 복구, 자원이 남는 노드를 찾아서 할당하는 등의 작업을 수행한다.

데이터는 Slave Node에서 실행되는 Excutor 가 처리한다.

전체 과정을 정리했다.
  1. Driver Program이 실행되고, Spark Context 객체를 생성한다.
  2. Spark Context는 Cluster Manager에 연결한다.
  3. Cluster Manager는 클러스터내에서 사용 할 수 있는 Executor를 할당한다.
  4. Executor를 할당받은 Spark Context는 Jar 혹은 python 코드를 Executor에 전달한다.
  5. 코드를 받은 Executor은 코드를 실행해서 작업을 수행한다.

MapReduce와 Spark

Spark는 하둡의 분산처리 시스템인 MapReduce와 아주 비슷하다. MapReduce와 Spark의 차이점은 아래와 같다.

아이템 MapReduce Apache Spark
Data Processing Only Batch Processing Batch Processing & Real Time data Processing
Processing Speed Slow(디스크 ID를 사용한다.) 메모리를 사용한다. 100배 이상 빠르다.
Category 데이터 프로세싱 엔진 데이터 분석 엔진
Costs Spark 보다 싸다. 메모리를 사용하기 때문에 비싸다.
Scalability 클러스터 하나에 1000개 노드 제한 클러스터 하나에 1000개 노드 제한
Machine Learning Apache Mahout와 잘 통합된다. ML을 위한 API를 내장하고 있다.
Scheduler 외부 스케쥴러를 사용한다. 자체 스케쥴러를 사용한다.
Fault Tolerance 복제 기반 RDD 기반
Ease of Use Spark에 비해서 어렵다. 사용하기 쉬운 API를 제공한다. MR 보다 쉽다.
Language Java, C, C++, Ruby, Python, Groovy Java, Scala, Python, R
Latency Very High Latency MR 보다 빠르다.
Complexity 코딩과 디버깅이 어렵다. 코딩과 디버깅이 쉽다.
Coding 더 많은 코드 적은 코드
Interactive Mode Not interactive interactive
SQL Hive Query Spark SQL
Spark는 DAG(Directed Acyclic Graph) 기능을 제공한다. DAG는 RDD에 적용된 작업을 추적하는 그래프다. DAG를 이용해서 중단된 작업을 복구하는 등의 높은 내결함성을 가지는 데이터처리 시스템을 구축할 수 있다.

Spark를 사용하는 기업들

많은 기업과 공공조직들이 Spark를 사용하고 있다. 비슷한 프레임워크인 MapReduce에 비해서 역사가 짧은 이유로 사용 기업 목록은 MR 보다 적긴하지만 빠르게 증가하고 있다.

쉽게 배울 수 있다

Spark 프레임워크는 Python, Java, Scala 언어를 제공한다. MR에 비해서 쉽게 코드를 만들 수 있으며 특히 Python을 사용 할 경우 생산성을 크게 높일 수 있다. Spark는 RDD(Resilient Distributed Dataset)이라는 핵심 개념을 제공한다. RDD는 "분산된 변경 불가능한 데이터셋"이다. 이들 RDD는 여러 노드에 분산되서 데이터 연산을 수행한다. 각각의 RDD는 데이터 셋에 적용된 모든 변환 작업을 추적하는데, 따라서 연산속도를 높이고 문제가 발생했을 때 빠르게 추적/복구 할 수 있다. 이러한 작업은 MapReduce 보다 더 적은 코드로 수행 할 수 있다.

Spark 컴포넌트

 Apache Spark 컴포넌트

Apache Spark는 Apache Spark Core, Spark SQL, Spark Streaming, MLLib, GraphX 5개의 컴포넌트로 구성된다.

Apache Spark Core
Spark Core는 아키텍처의 모든 것을 제공하는 핵심영역으로 아래의 기능을 제공한다.
  1. 작업의 배포(Distributing tasks) : 배포작업
  2. Programming
  3. Input/Output 작업
Java, Python, Scala, R 프로그래밍 인터페이스를 사용하며 RDD의 추상화에 중점을 둔다. RDD에서 맵, 필터 또는 리듀스 작업을 병렬로 수행 할 수 있는 기능 모델을 설정한다. 이들 함수를 Spark로 전달해서 클러스터에서 병렬로 실행 하도록 계획을 수립한다.

Spark SQL

Spark sQL은 SchemaRD 라는 데이터 추상화 개념을 도입해서 구조화된 데이터와 반 구조화된 데이터의 처리를 지원한다. 아래의 툴을 이용해서 데이터를 조작 할 수 있다.

  • Scala, Java, Python 등의 언어를 이용한 조작
  • ODBC/JDBC 컨넥터를 이용한 조작

Spark Streaming

Spark Streaming는 실시간 처리를 담당한다. 배치처리와 스트리밍 워크로드를 모두 지원하는 확장 가능한 내결삼성 스트리밍 처리 시스템이다. Spark Streaming는 Kafka, Flume, Amazon Kinesis를 비롯한 다양한 소스의 실시간 데이터를 처리 할 수 있다.

MLlib (Machine Learning Library

MLlib는 Apache Spark의 머신러닝 라이브러리다. Spark API와 잘 통합되며 Python, R, Scala, 자바를 모두 지원한다. 다만 MLlib가 모든 언어를 지원하는 것은 아니다. 메인 언어는 Scala 이며, 언어에 따라서 제약사항이 있다.

Spark MLlib는 classification, regression, clustering, collaboration filtering, demensionality reduction 등을 포함하는 일반적인 러인 알고리즘들로 구성된다. Spark MLib는 Spark SQL, Spark Streaming 및 DataFrames와 같은 다른 Spark 구성요소와 완벽하게 통합된다.

GraphX

GraphX는 병렬 그래프 연산을 위한 Spark 구성요소다. GraphX는 Spark RDD 개념을 사용해서 그래프 분석 작업을 단순화/효율화 한다. 그래프는 특히 검색엔진과 소셜 네트워크에서의 관계를 분석하기 위해서 널리 사용하고 있다.

 소셜 네트워크

RDD

RDD(Resilient Distributed Dataset)은 Spark의 기본 자료구조이며, 가장 중요한 구성요소 중 하나다. Spark가 분산 시스템이라는 것을 생각해 보면 Distirubted Dataset 즉 데이터를 분산해서 관리한다는 것은 쉽게 유추할 수 있을 것이다.

앞서 Spark는 데이터를 디스크가 아닌 메모리에서 관리한다고 했는데, 그렇다면 중간에 데이터에 문제가 생길 경우 이를 효과적으로 복원할 수 있는 어떤 방법이 필요하다. Spark는 immutable(불변) 한 데이터셋을 만드는 것으로 이 문제를 해결 했다. RDD는 아래와 같은 특징을 가진다.
  1. Resilient
  2. Distributed
  3. Dataset
  4. Lazy evaluated
  5. Immutable : 읽기 전용. 디스크에서 RDD를 만들거나, RDD에서 다른 RDD로 변환하는 것은 가능하지만 RDD의 내용을 수정 할 수는 없다.
 PARTITONS

RDD에 대해서는 TransformationsActions두 가지 오퍼레이션이 있다.

Transformation : 존재하는 RDD를 변환해서 새로운 RDD를 만든다. Spark의 연산은 lazy evaluation 으로 수행된다. 변환 연산을 즉시 수행하지 않고 모아뒀다가 최적의 방법을 찾아서 처리한다.

여기에서 말하는 최적화는 대부분 지역성에 관한 것이다. 비슷한 일을 하는 연산을 모아서 처리하는 방식이다. 예를 들어서 A 상점과 B 상점에서 물건을 구매해야 한다면, 따로따로 여러 번 사오게 하는 대신에 주문을 받아서 시장을 방문 했을 때 한번에 구매하는 게 효율 적일 것이다.

하나의 RDD 입력으로 부터 하나 이상의 RDD가 만들어질 수 있다.

Actions : 결과값이 정수나 리스트, 맵 등과 같이 RDD가 아닌 것이다. Actions 역시 Lazy evaluation 방식이다.

Transformations 연산을 정리했다.
  • map(func)
  • flatMap(func)
  • filter()
  • mapPartitions()
  • mapPartitionWithIndex
  • union(dataset)
  • intersection(other-dataset)
  • distinct()
  • groupByKey()
  • reduceByKey(func, [numTasks])
  • sortByKey()
  • join()
  • coalesce()
아래는 Action 연산이다.
  • count()
  • collect()
  • take(n)
  • top()
  • countByValue()
  • reduce()
  • fold()
  • aggregate()
  • foreach()

Jobs, Stages & Tasks

RDD의 실행은 job, stages, tasks로 구성된다.

  • RDD에서 action을 호출하면 "job"이 만들어진다. Job은 Spark에 제출된 작업이다.
  • shuffle 경계에 따라서 job은 여러 개의 "stage"로 나뉜다.
  • 각 stage는 RDD의 파티션 수에 따라서 "task"로 나뉜다. 따라서 task는 Spark의 가장 작은 작업단위가 된다.
 Jobs, Stages, Tasks

DAG

RDD는 Resilient 특성을 가진다고 했다. 문제가 생겼을 때 복구가능하다는 것인데, DAG(Directed Acyclic Graph)를 이용해서 이를 구현하고 있다. 아이디어는 간단하다.
  1. RDD는 읽기 전용이다.
  2. 그렇다면 RDD가 변환됐을 때의 히스토리를 기록해 둔다면 쉽게 복구 할 수 있을 것이다.
Spark는 RDD가 부모로 부터 어떻게 만들어졌는지 계보(lineage)를 기록하는 것으로 fault-tolerant 한 시스템을 구성한다.

 DAG

Ratings Histogram Walkthrough

from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
sc = SparkContext(conf = conf)

lines = sc.textFile("./u.data")
ratings = lines.map(lambda x: x.split()[2])
result = ratings.countByValue()

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print ("%s %i" % (key, value))

앞장에서 다루었던 ratings-count.py 프로그램이다. 이 코드에서 RDD는 1점부터 5점까지의 각 등급별 영화 갯수에 대한 막대 그래프 데이터를 출력 할 것이다.

 Rating Histogram walkthrough 그래프

한줄 한줄 정확하게 분석해 보자.
from pyspark import SparkConf, SparkContext
import collections
모든 Spark 애플리케이션들은 위의 패키지들을 import 한다.
  • SparkConf : Spark 애플리케이션을 설정한다. 다양한 spark 매개변수를 Key-Value 쌍으로 설정 할 수 있다.
  • SparkContext : Spark 애플리케이션의 main 진입 점(entry point)다. SparkContext는 Spark Cluster로의 연결을 나타내며, 해당 클러스터에서 RDD 및 broadcast 변수를 작성하는데 사용할 수 있다.
  • collections : Spark와는 관계 없는 python 모듈이다. python의 범용 내장 컨테이너인 dict, list, set, tuple 보다 나은 컨테이너를 제공한다.
많은 Spark 애플리케이션들이 아래 코드와 비슷하게 시작 할 것이다.
conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
sc = SparkContext(conf = conf)
Spark는 로컬 머신과 원격 병렬 머신에서 실행 할 수 있다. setMaster() 메서드를 이용해서 실행환경을 설정 할 수 있다. 이 예제는 로컬 머신에서 실행하므로 "local"을 설정했다. 원격 Spark 마스터에 연결 할 경우에는 "spark://spark-master:7077" 과 같이 설정하면 된다. setAppName()메서드로 애플리케이션 이름을 설정할 수 있다. 지금은 하나의 Spark 애플리케이션을 실행하지만 클러스터를 운영하게되면 여러개의 Spark 애플리케이션을 실행 하게 된다. 이 때 애플리케이션 이름을 이용해서 각 Spark 애플리케이션을 모니터링 할 수 있다.

SparkContext로 JVM을 시작하고 JavaSparkContext를 작성한다. SparkContext를 이용해서 Spark 클러스터에 대한 연결을 획득 할 수 있다. 여기에서 "Spark Context sc"를 얻었다. 지금은 로컬 머신에서 테스트를 하고 있지만, 나중에는 분산 시스템에서 작동하는 Spark 애플리케이션을 만들 기회가 있을 것이다.

분석할 데이터 파일을 읽어보자.
lines = sc.textFile("./u.data")
textFile 메서드를 이용해서 HDFS, 로컬 파일 시스템, s3 등에서 파일을 읽을 수 있다. 이 코드는 로컬에 있는 u.data 파일을 읽는다. 이 파일에는 10만개(100k)의 영화 평점 정보가 저장돼 있다.
# wc u.data
100000  400000 1979173 u.data

파일의 내용은 아래와 같다.
196 242 3   881250949
186 302 3   891717742
22  377 1   878887116
244 51  2   880606923
166 346 1   886397596

textFile 메서드는 파일을 줄 단위로 읽는다. 각 줄은 RDD의 값 하나에 해당한다. 즉 1번째 줄은 RDD의 첫번째 값이 되고, 두번째 줄은 RDD의 두번째 값이 된다. u.data 파일이 위의 6개의 줄로 이루어져 있다면, 이 RDD는 6개의 값으로 구성 된다.

ratings = lines.map(lambda x: x.split()[2])
lines.map은 각 줄에 대해서 lambda 함수를 실행한다. 이 lambda 함수는 각 줄을 매개변수 x로 받아서 x.split()를 실행 해서 배열로 만들어서 3번째 배열 값을 반환한다. 그 결과 평점 정보를 가지는 새로운 RDD가 만들어진다.

 spark map

이제 원본 데이터로 부터 10만건의 평점 데이터를 가지고 있는 RDD가 만들어졌다. 이제 각 평점을 카운팅해야 한다.
result = ratings.countByValue()
이 RDD에는 "3 3 1 2 ..."를 포함하고 있는데, 같은 값(Value)에 대해서 count하는 작업을 수행해야 한다. pyspark에서 제공하는 Action 연산인 countByValue()를 이용하면 된다. 이 연산의 작동 방식을 묘사했다.

 countByValue Action

10만건에 대해서 countByValue() action을 적용하면 평점 데이터가 만들어진다. 이 평점 데이터를 보기 좋게 정렬하자.
sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print ("%s %i" % (key, value))
결과물을 sorted 내장 함수로 정렬하고 OrderedDict로 입/출력을 고정(입력한 순서대로 출력)하면 마무리된다.

Key/Value RDD's and the Average Friends by Age Example

참고