Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Contents

이 문서는 AWS CLI 환경을 갖췄다는 가정하에 작성했다.

하둡(Hadoop) MapReduce

맵리듀스(MapReduce)는 분산&병렬처리 알고리즘을 이용 클러스터링 환경에서 빅 데이터 세트를 처리하기 위한 프로그래밍 모델및 관련 구현체를 일컫는다.

맵리듀스 프로그래밍 관련 구현체중 가장 유명한 구현체가 하둡 맵리듀스다.

Hadoop streaming

하둡 스트리밍은 Hadoop에서 배포하는 유틸리티다. 이 유틸리티를 이용하면, mapper과 reducer를 다양한 스크립트로 실행 할 수 있다. 예를 들어 아래와 같은 응용이 가능하다.
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc

AWS EMR

Amazon Elastic MapReduce(EMR)은 AWS에서 제공하는 관리형 하둡 프레임워크다. EC2 인스턴스를 기반으로 클러스터를 구성하며, 대량의 데이터를 비용효율적으로 처리 할 수 있다. 하둡 기반의 빅데이터 처리 시스템을 구성 할 경우 다수의 서버를 상시 준비상태로 둬야 하는데, EMR은 필요할 때 즉시, 필요한 크기 만큼만 실행 할 수 있다.

또한 Amazon S3와 DynamoDB, Aurora RDS 와 같은 AWS 데이터 스토어와의 상호작용이 쉽기 때문에 Amazon 기반에서의 원할한 데이터 작업환경을 만들 수 있다. 예를 들어 아래와 같은 구성이 가능하다.
  1. 대량의 분석 파일을 S3로 저장한다.
  2. EMR을 이용 S3로 부터 파일을 읽어서 분석 데이터를 만든다.
  3. 분석 데이터를 DynamoDB와 Aurora RDS에 저장해서 서비스에 사용 할 수 있도록 한다.
개발자는 Jupyter Notebook에 기반한 EMR NoteBooks을 분석 툴로 사용 할 수 있다.

EMR은 로그분석, 대량의 데이터 색인, ETL, 기계학습, 금융분석, IoT 등 광범위한 빅데이터를 처리하기 위해서 사용 한다.

예제

EMR을 이용해서 영어단어 갯수를 계산하는 간단한 프로그램을 만들어보자. 테스트에 사용할 데이터 파일을 다운로드 했다.
# wget http://storage.googleapis.com/books/ngrams/books/googlebooks-eng-all-1gram-20120701-x.gz
# gunzip googlebooks-eng-all-1gram-20120701-x.gz 
# head googlebooks-eng-all-1gram-20120701-x 
X'rays	1914	1	1
X'rays	1917	1	1
X'rays	1919	1	1
X'rays	1921	1	1
X'rays	1922	2	1
X'rays	1923	1	1
X'rays	1927	1	1
X'rays	1930	5	3
X'rays	1931	2	2
X'rays	1932	3	2
이 데이터에서 우리는 1900년대에 xray 단어의 출현빈도를 확인 할 수 있다. 첫번째 열은 단어이고, 두 번째는 출현 연도, 세번째 열은 총 출현횟수, 마지막 열은 출현한 책의 갯수다.

우리는 위 데이터에서 알파벳 문자로만 이루어진 (정상적인)단어를 찾아서 1999년에 어떤 단어가 나타나기 시작했는지를 살펴볼 것이다.

맵리듀스의 첫번째 단계는 mapper를 만드는 것이다. 여기에서는 특수문자등을 제거해서 깨끗한 단어를 포함한 줄(레코드)만 출력하는 프로그램을 만든다. mapper의 코드는 아래와 같다.
#!/usr/bin/env python3
 
import sys
 
def CleanWord(aword):
    """
    Function input: A string which is meant to be
       interpreted as a single word.
    Output: a clean, lower-case version of the word
    """
    # 소문자로 변환한다. 
    aword = aword.lower()
    # 문자열에서 특수문자를 제거한다. 
    for character in '.,;:\'?':
        aword = aword.replace(character,'')
    # 문자열의 길이가 0이라면 None을 반환한다. 
    if len(aword)==0:
        return None
    # 표준영어 알파벳으로 제한한다. 
    for character in aword:
        if character not in 'abcdefghijklmnopqrstuvwxyz':
            return None
    # 정리된 단어를 반환한다. 
    return aword
 
# 루프를 돌면서 표준입력 데이터를 읽는다. 
for line in sys.stdin:
    # 읽은 줄의 처음과 시작에 있는 화이트스페이스 문자를 제거하고
    # 화이트스페이스 기준으로 분해해서 배열로 만든다.
    line = line.strip().split()
    # CleanWord 함수를 이용해서 소문자 영어알파벳을 만든다. 
    word = CleanWord(line[0])
 
    # CleanWord 결과가 None인 경우 다음 줄로 넘어간다. 
    if word == None:
        continue
 
    year = int(line[1])
    occurrences = int(line[2])
 
    # Print the output: word, year, and number of occurrences
    print ('%s\t%s\t%s' % (word, year,occurrences))
실행해보자.
# cat googlebooks-eng-all-1gram-20120701-x | ./mapper.py
xrays   1914    1
xrays   1917    1
xrays   1919    1
xrays   1921    1
xrays   1922    2
xrays   1923    1
xrays   1927    1
xrays   1930    5

아래는 리듀서다.
#!/usr/bin/env python3
import sys
 
# current_word will be the word in each loop iteration
current_word = ''
# word_in_progress will be the word we have been working
# on for the last few iterations
word_in_progress = ''
 
# target_year_count is the number of word occurrences
# in the target year
target_year_count = 0
# prior_year_count is the number of word occurrenes
# in the years prior to the target year
prior_year_count = 0
 
# Define the target year, in our case 1999
target_year = 1999 
 
# 루프를 돌면서 표준입력을 읽는다. 
for line in sys.stdin:
 
    # 읽은 문자열을 탭문자로 나눈다. 
    line = line.strip().split('\t')

    # 배열의 길이가 3이 아니라면 포맷에 맞지 않는 데이터다.
    # 다음 줄을 읽는다.
    if len(line)!=3:
        continue
 
    # 이 배열에는 단어, 단어가 발생한 년도, 발생 수
    # 로 구성된다.
    current_word, year, occurrences =  line
 
    # 만약 새로 등장한 단어라면, 카운터를 0부터 새로 시작한다. 
    # 문자열들은 정렬돼 있기 때문에, 이전 문자열과 다른 것으로
    # 새로 등장한 단어인지를 검토 할 수 있다.
    if current_word != word_in_progress:
        # Word exists in target year
        # 단어의 카운트가 0 보다 크면
        if target_year_count > 0:
            # target_year 이전에 단어가 등장하지 않았다면
            # target_year에 새로 등장한 단어일 것이다.
            if prior_year_count ==0:
                # Print the cool new word and its occurrences
                print ('%s\t%s' % (word_in_progress,target_year_count))
 
        # 카운터들을 0으로 초기화 한다. 
        target_year_count = 0
        prior_year_count = 0
        word_in_progress = current_word
 
    # year와 occureences가 integer이 아닐 경우
    # 다음 루프를 진행한다.
    try:
        year = int(year)
    except ValueError:
        continue
    try:
         occurrences = int(occurrences)
    except ValueError:
        continue
 
    # 변수를 추가한다. 
    if year == target_year:
        target_year_count += occurrences
    if year < target_year:
        prior_year_count += occurrences
 
# 루프가 끝난 후에 마지막 단어가 조건에 만족하는지 검사한다.
if target_year_count > 0:
    if prior_year_count ==0:
        print ('%s\t%s' % (word_in_progress,target_year_count))
리듀서 프로그램은 target_year인 1999에 처음 출현한 단어와 출현빈도를 출력한다. 앞서 만든 맵퍼와 리듀서는 표준입출력으로 작동하기 때문에 shell에서도 테스트해볼 수 있다. 단 맵퍼의 결과는 정렬되어야 한다.
$ cat googlebooks-eng-all-1gram-20120701-x | ./mapper.py | sort -k1,1 | ./reduer.py | sort -k2,2n
...
...
xaconnectionfactory	21
xdcam	25
xmlparser	83
xadatasource	338
지금은 데이터가 4백만 줄 정도라서 개인 데스크탑에서 쉘 스크립트돌리는 것 만으로도 십초 안에 작업을 끝낼 수 있었다. 하지만 수십억 데이터라고 하면, 이야기가 달라질 것이다.

위 스크립트 예제는 크게 3개 과정으로 진행되는 걸 확인 했을 것이다.
  1. 맵퍼에서 데이터를 필터링하고
  2. 특정 필드를 키로 정렬하고
  3. 정렬한 데이터를 리듀서에게 넘겨준다.
맵리듀스도 이 3개의 과정으로 진행된다. 단지 분산된다는 점이 다를 뿐이다. 아래 그림을 보자.

 Map Reduce Flow

EMR로 테스트해보자

하둡은 대용량의 파일을 처리하기 위해서 HDFS라는 분산파일 시스템을 파일 시스템으로 사용한다. EMR의 경우 HDFS 대신에 S3를 사용한다. HDFS에 비해서 훨씬 더 효과적으로 사용 할 수 있다. HDFS 대신에 S3를 사용해야 하는 5가지 이유 문서를 읽어보자. 결론만 요약해보자면 아래와 같다.

S3 HDFS S3 vs HDFS
탄력적 운영 Yes No S3가 더욱 탄력적이다.
테라바이트당 월 비용 23$ 206$ 10X
가용성 99.99% 99.9% 10X
내구성 99.999999999% 99.9999% 10X
트랜잭션 쓰기 DBIO를 사용 YES HDFS가 좀 더 낫다.
aws cli를 이용해서 s3에 테스트용 버킷을 만들었다.
$ aws s3 mb s3://emr.joinc.co.kr
make_bucket: emr.joinc.co.kr
이 버킷에 분석할 파일과 매퍼,리듀서를 복사한다.
# aws s3 cp reduer.py s3://emr.joinc.co.kr/code/reducer.py
# aws s3 cp mapper.py s3://emr.joinc.co.kr/code/mapper.py
# aws s3 cp googlebooks-eng-all-1gram-20120701-x s3://emr.joinc.co.kr/input/NGramsX.txt

EMR 수행

AWS CLI로 EMR을 실행해보려 한다. 일반적인 사용 방법은 아래와 같다.
aws emr create-cluster --name "Test cluster" --ami-version 2.4 --applications Name=Hive Name=Pig \
--use-default-roles --ec2-attributes KeyName=myKey \
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=2,InstanceType=m4.large \
--steps Type=PIG,Name="Pig Program",ActionOnFailure=CONTINUE,Args=[-f,s3://mybucket/scripts/pigscript.pig,-p,INPUT=s3://mybucket/inputdata/,-p,OUTPUT=s3://mybucket/outputdata/,$INPUT=s3://mybucket/inputdata/,$OUTPUT=s3://mybucket/outputdata/]
  • emr create-cluster : 클러스터를 만든다.
  • --name : 클러스터의 이름을 설정한다.
  • --ami-version : AWS EMR 버전을 설정한다. 4.0 이전의 버전이라면 --ami-version을 사용한다. 4.0 이후 버전의 경우 --release-label을 사용해야 한다. 예를 들어 --ami-version 3.1.0, --release-label emr-5.14.0 과 가이 사용 할 수 있다.
  • --applications : AWS Hadoop가 설치할 애플리케이션들.
  • --instance-groups : 인스턴스의 그룹 타입과 클러스터를 구성할 인스턴스의 타입을 설정 할 수 있다.
    • InstanceGroupType : MASTER, CORE, TASK 3개의 타입의 노드를 설정한다. MASTER는 하둡의 마스터노드, CORE 노드는 마스터노드가 관리하는 노드로 MapReduce, Spark 프로그램등을 실행한다. TASK논 옵션이다. 이들에 대한 자세한 내용은 EMR Master, Core, Task Node문서를 참고하자.
    • InstanceCount : 작업을 실행할 인스턴스의 갯수 즉 클러스터의 크기
    • InstanceType : m4.large와 같은 EC2 인스턴스 타입
  • steps : 클러스터가 실행할 작업의 목록을 설정한다. 클러스터의 생성과 작업은 분리할 수 있으므로 steps는 add-steps 명령을 이용해서 따로 실행 할 수 있다.
하지만 옵션이 엄청나게 많고 번잡스럽기 때문에, 그냥 웹 콘솔로 만들기로 했다. 자동화를 위해서는 결국 aws cli를 사용해야 겠지만, 웹 콘솔에서 클러스터를 만들면 aws cli import로 명령을 확인 할 수 있기 때문에 문제될건 없다. EMR 대시보드로 이동하자.

Create Cluster를 클릭한다.

  • Cluster name : 클러스터 이름
  • Logging : 로깅을 할 것인지
  • Release : emr 버전. 그냥 최신 버전 선택했다.
  • Application : 사용할 애플리케이션을 선택한다.

  • Instance type : 클러스터를 구성할 인스턴스 타입
  • Number of instances : 인스턴스의 갯수.
  • EC2 key pair : 클러스터를 구성하는 인스턴스에 들어갈 일이 있다면 필요하겠다.
  • EMR role, EC2 instance profile : 그냥 기본으로 한다.
Create cluster를 클릭하면 클러스터가 만들어진다.

클러스터를 만들고 나면, 상세 정보를 보여준다. AWS CLI export를 클릭하면, 이 클러스터를 만드는데 사용한 aws cli 명령을 확인 할 수 있다.
aws emr create-cluster --applications Name=Ganglia Name=Hadoop Name=Hive Name=Hue Name=Mahout Name=Pig Name=Tez --ec2-attributes '{"KeyName":"joinc_test","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-9e1502f6","EmrManagedSlaveSecurityGroup":"sg-0a9e006395552860f","EmrManagedMasterSecurityGroup":"sg-0a8012b8f0c91eb6f"}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-5.24.0 --log-uri 's3n://aws-logs-522373083963-ap-northeast-2/elasticmapreduce/' --name 'My cluster' --instance-groups '[{"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"r3.xlarge","Name":"Core Instance Group"},{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"r3.xlarge","Name":"Master Instance Group"}]' --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region ap-northeast-2
상당히 복잡하다.

클러스터는 만들어지는데 시간이 걸린다. 처음에는 Starting상태로 Waiting상태가 되야 비로서 클러스터를 사용 할 수 있다. 클러스터 규모에 따라서 달라질 수 있겠는데, 내 경우 클러스터가 준비되는데 8분 정도의 시간이 걸렸다.
$ aws emr list-clusters --active
{
    "Clusters": [
        {
            "Id": "j-3KGZHWMIER9RH",
            "Name": "My cluster",
            "Status": {
                "State": "STARTING",
                "StateChangeReason": {
                    "Message": "Configuring cluster software"
                },
                "Timeline": {
                    "CreationDateTime": 1561041365.074
                }
            },
            "NormalizedInstanceHours": 0
        }
    ]
}
  • ID : Cluster 식별을 위한 아이디
  • State : STARTING. 아직 준비중임을 알 수 있다.
$ aws emr list-clusters --active
{
    "Clusters": [
        {
            "Id": "j-3KGZHWMIER9RH",
            "Name": "My cluster",
            "Status": {
                "State": "WAITING",
            // .... 생략 
        }
    ]
}
WAITING 상태다. 준비완료. 이제 작업을 실행하면 된다.

클러스터의 Steps 탭에서 Add step를 클릭한다.

  • Step type : Custom Jar, Hive, Pig, Streaming 중 선택 할 수 있다. Streaming program 을 선택했다.
  • Mapper : 맵퍼를 선택한다.
  • Reducer : 리듀서를 선택한다.
  • Input s3 location : 처리할 데이터 파일.
  • Output S3 location : 작업 결과 파일을 저장할 디렉토리. 같은 이름의 디렉토리가 있다면 에러를 출력한다.
작업이 완료됐는지 확인해보자.
$ aws emr describe-step --cluster-id j-3KGZHWMIER9RH --step-id s-7B1F1Y7FEESB
{
    "Step": {
        "Id": "s-7B1F1Y7FEESB",
        "Name": "Streaming program",
        "Config": {
            "Jar": "command-runner.jar",
            "Properties": {},
            "Args": [
                "hadoop-streaming",
                "-files",
                "s3://emr.joinc.co.kr/code/mapper.py,s3://emr.joinc.co.kr/code/reducer.py",
                "-mapper",
                "mapper.py",
                "-reducer",
                "reducer.py",
                "-input",
                "s3://emr.joinc.co.kr/input/NGramsX.txt",
                "-output",
                "s3://emr.joinc.co.kr/output/"
            ]
        },
        "ActionOnFailure": "CONTINUE",
        "Status": {
            "State": "COMPLETED",
            "StateChangeReason": {},
            "Timeline": {
                "CreationDateTime": 1561042395.67,
                "StartDateTime": 1561042406.848,
                "EndDateTime": 1561042494.977
            }
        }
    }
}
성공했다. 결과파일이 잘 만들어졌는지 확인해 보자.
$ aws s3 ls s3://emr.joinc.co.kr/output/
2019-06-20 23:54:51          0 _SUCCESS
2019-06-20 23:54:42         97 part-00000
2019-06-20 23:54:42         55 part-00001
2019-06-20 23:54:43         45 part-00002
2019-06-20 23:54:43         90 part-00003
2019-06-20 23:54:48        123 part-00004
2019-06-20 23:54:51         97 part-00005
2019-06-20 23:54:51         76 part-00006
파일을 다운로드 해서, 원하는 결과가 나왔는지 검증해 보기로 했다.
$ aws s3 sync s3://emr.joinc.co.kr/output/ ./
$ cat * > tmp.txt 
$ cat tmp.txt| sort -k2,2n > result.txt
마지막으로 앞서 쉘 스크립트와 파이선 스크립트를 이용한 MR 결과 파일을(pymr.txt) 비교한다.
$ diff result.txt pymr.txt
100% 일치하는 걸 확인했다.

Amazon EMR에서의 S3N, S3A, S3 사용

정리

  • 세상 편해졌다.
  • 클라우드 서비스의 장점은 "빠른 성공"이 아닌 "빠른 실패"에 있다는 것을 체감 할 수 있었다. 한번의 성공을 위해서는 10번의 실패가 있어야 하는데, 그 19번의 실패를 아주~~ 자유롭게 빠르게 할 수 있다.
  • 하둡 스트리밍은 표준출력/표준입력 기반으로 데이터를 처리하기 때문에, 언어에 상관없이 MR을 작성 할 수 있다.
  • 왠지 빅데이터에 대한 자신감이 붙어버렸다.!?

참고