Amazon Elastic MapReduce(EMR)은 AWS에서 제공하는 관리형 하둡 프레임워크다. EC2 인스턴스를 기반으로 클러스터를 구성하며, 대량의 데이터를 비용효율적으로 처리 할 수 있다. 하둡 기반의 빅데이터 처리 시스템을 구성 할 경우 다수의 서버를 상시 준비상태로 둬야 하는데, EMR은 필요할 때 즉시, 필요한 크기 만큼만 실행 할 수 있다.
또한 Amazon S3와 DynamoDB, Aurora RDS 와 같은 AWS 데이터 스토어와의 상호작용이 쉽기 때문에 Amazon 기반에서의 원할한 데이터 작업환경을 만들 수 있다. 예를 들어 아래와 같은 구성이 가능하다.
대량의 분석 파일을 S3로 저장한다.
EMR을 이용 S3로 부터 파일을 읽어서 분석 데이터를 만든다.
분석 데이터를 DynamoDB와 Aurora RDS에 저장해서 서비스에 사용 할 수 있도록 한다.
개발자는 Jupyter Notebook에 기반한 EMR NoteBooks을 분석 툴로 사용 할 수 있다.
EMR은 로그분석, 대량의 데이터 색인, ETL, 기계학습, 금융분석, IoT 등 광범위한 빅데이터를 처리하기 위해서 사용 한다.
예제
EMR을 이용해서 영어단어 갯수를 계산하는 간단한 프로그램을 만들어보자. 테스트에 사용할 데이터 파일을 다운로드 했다.
이 데이터에서 우리는 1900년대에 xray 단어의 출현빈도를 확인 할 수 있다. 첫번째 열은 단어이고, 두 번째는 출현 연도, 세번째 열은 총 출현횟수, 마지막 열은 출현한 책의 갯수다.
우리는 위 데이터에서 알파벳 문자로만 이루어진 (정상적인)단어를 찾아서 1999년에 어떤 단어가 나타나기 시작했는지를 살펴볼 것이다.
맵리듀스의 첫번째 단계는 mapper를 만드는 것이다. 여기에서는 특수문자등을 제거해서 깨끗한 단어를 포함한 줄(레코드)만 출력하는 프로그램을 만든다. mapper의 코드는 아래와 같다.
#!/usr/bin/env python3
import sys
def CleanWord(aword):
"""
Function input: A string which is meant to be
interpreted as a single word.
Output: a clean, lower-case version of the word
"""
# 소문자로 변환한다.
aword = aword.lower()
# 문자열에서 특수문자를 제거한다.
for character in '.,;:\'?':
aword = aword.replace(character,'')
# 문자열의 길이가 0이라면 None을 반환한다.
if len(aword)==0:
return None
# 표준영어 알파벳으로 제한한다.
for character in aword:
if character not in 'abcdefghijklmnopqrstuvwxyz':
return None
# 정리된 단어를 반환한다.
return aword
# 루프를 돌면서 표준입력 데이터를 읽는다.
for line in sys.stdin:
# 읽은 줄의 처음과 시작에 있는 화이트스페이스 문자를 제거하고
# 화이트스페이스 기준으로 분해해서 배열로 만든다.
line = line.strip().split()
# CleanWord 함수를 이용해서 소문자 영어알파벳을 만든다.
word = CleanWord(line[0])
# CleanWord 결과가 None인 경우 다음 줄로 넘어간다.
if word == None:
continue
year = int(line[1])
occurrences = int(line[2])
# Print the output: word, year, and number of occurrences
print ('%s\t%s\t%s' % (word, year,occurrences))
#!/usr/bin/env python3
import sys
# current_word will be the word in each loop iteration
current_word = ''
# word_in_progress will be the word we have been working
# on for the last few iterations
word_in_progress = ''
# target_year_count is the number of word occurrences
# in the target year
target_year_count = 0
# prior_year_count is the number of word occurrenes
# in the years prior to the target year
prior_year_count = 0
# Define the target year, in our case 1999
target_year = 1999
# 루프를 돌면서 표준입력을 읽는다.
for line in sys.stdin:
# 읽은 문자열을 탭문자로 나눈다.
line = line.strip().split('\t')
# 배열의 길이가 3이 아니라면 포맷에 맞지 않는 데이터다.
# 다음 줄을 읽는다.
if len(line)!=3:
continue
# 이 배열에는 단어, 단어가 발생한 년도, 발생 수
# 로 구성된다.
current_word, year, occurrences = line
# 만약 새로 등장한 단어라면, 카운터를 0부터 새로 시작한다.
# 문자열들은 정렬돼 있기 때문에, 이전 문자열과 다른 것으로
# 새로 등장한 단어인지를 검토 할 수 있다.
if current_word != word_in_progress:
# Word exists in target year
# 단어의 카운트가 0 보다 크면
if target_year_count > 0:
# target_year 이전에 단어가 등장하지 않았다면
# target_year에 새로 등장한 단어일 것이다.
if prior_year_count ==0:
# Print the cool new word and its occurrences
print ('%s\t%s' % (word_in_progress,target_year_count))
# 카운터들을 0으로 초기화 한다.
target_year_count = 0
prior_year_count = 0
word_in_progress = current_word
# year와 occureences가 integer이 아닐 경우
# 다음 루프를 진행한다.
try:
year = int(year)
except ValueError:
continue
try:
occurrences = int(occurrences)
except ValueError:
continue
# 변수를 추가한다.
if year == target_year:
target_year_count += occurrences
if year < target_year:
prior_year_count += occurrences
# 루프가 끝난 후에 마지막 단어가 조건에 만족하는지 검사한다.
if target_year_count > 0:
if prior_year_count ==0:
print ('%s\t%s' % (word_in_progress,target_year_count))
리듀서 프로그램은 target_year인 1999에 처음 출현한 단어와 출현빈도를 출력한다. 앞서 만든 맵퍼와 리듀서는 표준입출력으로 작동하기 때문에 shell에서도 테스트해볼 수 있다. 단 맵퍼의 결과는 정렬되어야 한다.
지금은 데이터가 4백만 줄 정도라서 개인 데스크탑에서 쉘 스크립트돌리는 것 만으로도 십초 안에 작업을 끝낼 수 있었다. 하지만 수십억 데이터라고 하면, 이야기가 달라질 것이다.
위 스크립트 예제는 크게 3개 과정으로 진행되는 걸 확인 했을 것이다.
맵퍼에서 데이터를 필터링하고
특정 필드를 키로 정렬하고
정렬한 데이터를 리듀서에게 넘겨준다.
맵리듀스도 이 3개의 과정으로 진행된다. 단지 분산된다는 점이 다를 뿐이다. 아래 그림을 보자.
EMR로 테스트해보자
하둡은 대용량의 파일을 처리하기 위해서 HDFS라는 분산파일 시스템을 파일 시스템으로 사용한다. EMR의 경우 HDFS 대신에 S3를 사용한다. HDFS에 비해서 훨씬 더 효과적으로 사용 할 수 있다. HDFS 대신에 S3를 사용해야 하는 5가지 이유 문서를 읽어보자. 결론만 요약해보자면 아래와 같다.
--ami-version : AWS EMR 버전을 설정한다. 4.0 이전의 버전이라면 --ami-version을 사용한다. 4.0 이후 버전의 경우 --release-label을 사용해야 한다. 예를 들어 --ami-version 3.1.0, --release-label emr-5.14.0 과 가이 사용 할 수 있다.
--applications : AWS Hadoop가 설치할 애플리케이션들.
--instance-groups : 인스턴스의 그룹 타입과 클러스터를 구성할 인스턴스의 타입을 설정 할 수 있다.
InstanceGroupType : MASTER, CORE, TASK 3개의 타입의 노드를 설정한다. MASTER는 하둡의 마스터노드, CORE 노드는 마스터노드가 관리하는 노드로 MapReduce, Spark 프로그램등을 실행한다. TASK논 옵션이다. 이들에 대한 자세한 내용은 EMR Master, Core, Task Node문서를 참고하자.
InstanceCount : 작업을 실행할 인스턴스의 갯수 즉 클러스터의 크기
InstanceType : m4.large와 같은 EC2 인스턴스 타입
steps : 클러스터가 실행할 작업의 목록을 설정한다. 클러스터의 생성과 작업은 분리할 수 있으므로 steps는 add-steps 명령을 이용해서 따로 실행 할 수 있다.
하지만 옵션이 엄청나게 많고 번잡스럽기 때문에, 그냥 웹 콘솔로 만들기로 했다. 자동화를 위해서는 결국 aws cli를 사용해야 겠지만, 웹 콘솔에서 클러스터를 만들면 aws cli import로 명령을 확인 할 수 있기 때문에 문제될건 없다. EMR 대시보드로 이동하자.
Create Cluster를 클릭한다.
Cluster name : 클러스터 이름
Logging : 로깅을 할 것인지
Release : emr 버전. 그냥 최신 버전 선택했다.
Application : 사용할 애플리케이션을 선택한다.
Instance type : 클러스터를 구성할 인스턴스 타입
Number of instances : 인스턴스의 갯수.
EC2 key pair : 클러스터를 구성하는 인스턴스에 들어갈 일이 있다면 필요하겠다.
EMR role, EC2 instance profile : 그냥 기본으로 한다.
Create cluster를 클릭하면 클러스터가 만들어진다.
클러스터를 만들고 나면, 상세 정보를 보여준다. AWS CLI export를 클릭하면, 이 클러스터를 만드는데 사용한 aws cli 명령을 확인 할 수 있다.
Contents
하둡(Hadoop) MapReduce
Hadoop streaming
AWS EMR
예제
EMR로 테스트해보자
EMR 수행
Amazon EMR에서의 S3N, S3A, S3 사용
정리
참고
Recent Posts
Archive Posts
Tags