Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Contents

MapReduce 소개

MapReduce는 대량의 자원을 다루는 분산/병렬 시스템의 효율적인 지원을 위한 목적으로 Google(:12)에서 만들어낸 프로그래밍 모델이다. 테라바이트 단위의 데이터를 처리해야 하는 Google의 입장에서 분산/병렬 시스템을 지원하기 위한 이러한 툴의 개발은 필연적인 요구사항이였을 것이다. 분산/병렬 시스템의 효과적인 지원을 위해서는 다음의 사항들이 만족되어야 한다.
  1. 병렬처리 : 하나의 거대한 데이터는 여러개의 분산된 시스템으로 보내어져서 병렬적으로 처리될 수 있어야 한다.
  2. fault-tolerance : 병렬시스템중 몇개에 문제가 생기더라도 전체 시스템에 영향을 주면 안된다. 자동으로 관리할 수 있어야 한다.
  3. 데이터분산 및 로드밸런싱

프로그래밍 모델

용어에서 알 수 있듯이 MapReduce(:12) 는 Map과 Reduce가 합쳐진 용어로, Map 함수와 Reduce 함수의 조합을 통해서 분산/병렬 시스템운용을 지원한다. MapReduce는 데이터를 {Key, Value} 의 쌍으로 만들고 이를 처리하는 프로세스를 가진다.

Map은 사용자 정의 자료구조이며, 입력데이터에서 Key/Value 쌍으로 이루어진 중간 데이터 형태의 데이터를 만들어낸다. MapReduce 라이브러리는 Value에 대해서 Key I를 함께 관리한다. 그리고 이 중간데이터는 Reduce 함수로 넘겨진다. Reduce 함수역시 유저에 의해서 작성되며, Key I와 Key I에 대한 Value를 받아들이게 된다. 그리고 Key값을 이용해서 Value값을 합치게 된다.

MapReduce 테스트

그럼 MapReduce 를 테스트 프로그램의 예를 들어서 이해를 해보도록 하겠다.

만들고자 하는 프로그램은 여러개의 문서로부터 각각의 단어가 몇개씩 있는지를 검사하는 프로그램이다. 제대로 테스트 하기 위해서는 분산시스템 환경을 만들어야 겠지만, 상황이 여의치 않으므로 로컬시스템에서 IPC(:12)를 이용해서 리모트 분산 환경을 흉내내도록 하겠다.

https://lh6.googleusercontent.com/-8PLGAYp-Q2s/UAermzHfyqI/AAAAAAAACWs/ILkT9ZGDshw/s640/mapreduce.png

분석해야 하는 여러개의 입력 문서를 fork(2)된 worker 프로세스가 읽어들여서 Map으로 만든다음, 그 결과를 중간파일(Intermediate Files)로 저장한다. 저장한 중간파일은 다시 Recude worker 프로세스가 읽어들여서 최종 결과파일을 생성한다.

이를 리모트환경에서 구현을 한다면 Map worker 프로세스가 생성한 중간파일은 로컬 파일시스템에 저장되고, 원격지에 설치되어 있는 Reduce worker 프로세스가 파일을 잃어 가는 형식이 될것이다. User 프로그램은 각각의 worker프로세스에 작업을 할당하고 관리하는 일을 하게 된다.

로컬시스템에서 위의 환경을 흉내내고자 한다면 Reduce worker 프로세스가 IPC(:12)를 이용해서 중간파일을 읽어 가도록 바꾸면 될것이다.

응용

위의 구조는 MapReduce 시스템의 구조를 단적으로 보여준다. 다만 응용범위와 환경에 따라서 다양한 방식으로의 구성이 가능할 것이다. 여기에서는 몇개의 영문서에서 단어를 카운팅하는 카운팅엔진으로 구현응용에 대해서 알아보도록 하겠다.

전체 흐름

단순하게 생각하면, 파일단위로 map worker에 처리를 넘기도록 하면 되겠지만 이는 제대로된 병렬/분산 환경이라고 보기는 힘들다. 어떤 파일은 1Mega이고 어떤 파일은 100Mega일 수 있기 때문이다. 그러므로 여러개의 파일을 하나의 파일처럼 인식을 하고, 이것을 일정한 크기로 쪼개어서 읽어들이도록 해야 한다. User Program은 파일의 목록과 크기를 관리한다음에, 각각의 map worker 프로세스에게, 어떤 조각을 처리할지를 알려주는 역할을 한다. 이를 위해서 User Program은 파일의 목록과 크기 뿐만 아니라, 해당 Worker가 파일의 범위를 찾아갈 수 있도록 offset정보도 유지를 해야 한다.

  1. User Program은 효율적인 병렬/분산처리가 가능하도록 파일을 일정한 크기를 가지는 M 개의 조각으로 쪼갠다.
  2. User Program에는 Master이라는 특별한 프로세스가 존재한다. Master은 worker에게 M map과 R reduce를 할당하며, 각 worker의 상태를 관리한다.
  3. worker은 할당된 문서를 읽어들인다음 유저정의 Map 함수를 이용해서 key/value 형태로 데이터를 읽어들인다. Map함수는 만들어진 key/value를 buffer 메모리에 저장한다. 이 경우 key는 파일명이 될 것이고, value는 {단어,count}가 될 것이다.
  4. buffer 메모리(혹은 파일)에 있는 key/value는 주기적으로 지역파일에 쓴다. 이 파일은 master에 의해서 관리되며, reduce worker에 할당이 된다.
  5. reduce worker이 master로 부터 신호를받았다면, map worker에 의해서 지역파일에 씌여진 buffer 데이터를 가져온다. master와 reduce worker와의 통신은 remote procedure call등을 이용할 수 있을 것이다. 중간파일을 전부 읽어들였다면, 동일 key값을 이용해서, value와 관련된 필요한 계산을 하면 된다. 하나의 reduce작업에 다양한 종류의 key들이 존재하기 때문에, sort가 된후 작업이 되어질 필요가 있다. 이러한 작업은 상당히 많은 메모리를 차지하게 되는데, 이럴경우 다른 프로그램에 소트를 맡기도록 한다.
  6. reduce worker에서 소트를 해서 유일한 key를 만들어 낼 수 있게 되는데, 이때 사용자 정의된 Reduce 함수를 이용해서, value에 대한 연산을 하면 된다. 연산후 결과는 output 파일로 씌여지게 된다 output 파일로 씌여지게 된다.
  7. 모든 작업이 완료되었다면, reduce worker은 프로그램 리턴값을 User Program에게 넘겨준다.

Master 데이터 구조체

master은 map 작업, recude 작업과 진행상태(휴면,진행중,완료)와 worker 기계에 대한 정보를 유지하고 있어야 한다. 또한 중간파일을 reduce worker에게 분배하기 위해서, 완전히 작업이 끝난 파일에 대해서 크기를 모두 유지하고 있어야 한다. 그래야지만 reduce worker에게 할당할 데이터의 크기를 결정할 수 있기 때문이다.

Fault Tolerance

이러한 MapReduce 시스템은 크게는 수백에서 수천의 기계를 이용해서 정보를 처리해야 하는 경우가 생기며, 때문에 Fault Tolerance는 매우 중요한 사항이다. 발생할수 있는 Fault의 형태에 따라서 어떻게 대응해야 하는지에 대해서 알아보도록 하자.

Worker Failure

worker에 주기적으로 Ping를 보내서 응답을 확인해야 한다. 만약 일정시간동안 응답이 없다면 worker에 문세가 생긴 것으로 간주한다. 이 경우 다른 idle상태의 worker를 찾아서 대신 처리하도록 해야 한다.

Master Failure

Master의 문제 해결은 비교적 간단하다. 이전 Master이 진행했던 작업이 중간파일 형태로 남아있고 이를 이용해서 checkpointe를 알아낼수 있다. 고로 그냥 Master를 다시 실행시키기만 하면 된다.

Semantics in ther Presence of Failures

Locality

네트워크 대역폭은 지역 컴퓨팅 환경에 비해서 상대적으로 열악하다. 특히 데이터가 다수의 원격 컴퓨터에 분산되어 있다면, 이를 처리하기 위한 환경을 만드는 것도 매우 힘든 작업이 될 수 있다. 때문에 네트워크 상에 가상의 파일시스템을 만들어서, 마치 로컬 컴퓨팅 환경인것 처럼 만들어줄 필요가 있다.

Google의 경우 GFS(Google File System)을 만들어서 이 문제를 해결하고 있다. 공개진영 소스에는 GFS(Global File System)가 사용되어지고 있다. 이들 주제는 별도의 페이지를 만들어서 정리해 보고자 한다.

샘플 프로그램

MapReduce 프로그래밍 모델을 테스트 하기 위해서 문서로부터 단어를 추출해서 카운팅하고, Map Reduce 과정을 거쳐서 최종의 결과물을 만드는 프로그램을 만들어 보도록 하겠다.

이 예제는 MapReduce의 이론적인 측면을 구현하는데 촛점을 맞출 것이다. Worker 스케쥴, Fault Tolerance 등은 고려되지 않을 것이다. Map Reduce 과정을 거쳐서 생성되는 Output File는 용이한 검색을 지원하기 위한 색인 DB가 될 것이다.

다음은 우리가 만들 색인 프로그램의 MapReduce 다이어그램이다.

https://lh5.googleusercontent.com/-fCFkpxXF3H4/UAerqPu1fvI/AAAAAAAACW0/T9XF8qqHYwo/s800/term.png

  1. 4개의 RFC문서가 주어진다.
  2. map worker에 분배하기 위해서 4개의 문서를 3개의 조각으로 바꾼다.
  3. 3개의 map worker을 실행시키고, 각각의 조각을 읽어와서 <DID, Term> 형식의 데이터를 생성한다.
이 작업은 병렬적으로 수행된다.
  1. reduce worker에서 <DID, Term> 데이터를 읽어와서 <Term, DID>로 Term Invert한다.
  2. Term Invert된 데이터를 Term.idx 파일에 저장한다.
이렇게 해서 3개의 <Term, DID> .idx 파일이 생성되었다고 가정을 해보자. 만들어진 결과물이, 색인데이터로써의 기능을 수행하기 위해서는 3개의 데이터를 다시 합쳐서 중복된 Term은 제거하고, 여러개의 문서에 포함되어 있는 Term은 하나의 레코드로 만들어줘야 한다. 아마도 아래와 같은 자료구조를 가질 것이다.
struct LastIndexResult
{
	string Term;
	vector<string> DID;
}
set<LastIndexResult> IndexSet;
그 다음 위의 정보를 파일로 전부 쓰면 된다. 라고 간단히 설명은 했지만, 쉬운 방법으로 해결할 순 없다. 쉽게 생각해보자면 각각의 .idx파일에 이미 Term으로 정렬된 값이 들어가 있으므로, IndexSet을 만드는게 그리 문제되지 않을 거라 생각할 수 있지만, 문서의 양이 많아질 경우 "엄청난 양의 메모리를 소비"해야 한다는 문제가 생길 수 있기 때문이다. 그러므로 데이터를 전부 메모리에 넣어서 정렬하는 것보다는 정렬된 중간파일을 만들어서 병합시켜주는 과정이 필요하다. 귀찮기는 하지만 .idx의 값들은 모두 정렬되어 있으므로, 중간파일을 만들어서 병합시키고 OUTPUT 파일을 만드는건 어려운 작업은 아닐 것이다.

색인에서 예제로 제시한 indexer를 수정하는 형태로 만들도록 하겠다.

Nutch의 MapReduce 엔진

MapReduce는 프로그래밍 모델이고, 실제 활용하기 위해서는 MapReduce프로그래밍 모델이 적용된 MapReduce 엔진이 필요하다. 이 엔진을 위해서는 크게 다음과 같은 3개의 요소가 필요할 것이다.
  • Mapper
Map Task를 실행하기 위한 프로세스로, 파일을 Split 하고, Split된 각 부분을 읽어서 <Key, Value>로 만들기 위한 function을 가진다. <Key, Value>의 목록은 중간파일 형태로 저장이 될 것이다.ffff
  • Reducer
Reduce Task를 실행하기 위한 프로세스로, Mapper에 의해서 중간파일 형태로 저장된 <Key, Value>파일을 읽어들여서, 결과파일로 만들어 낸다.
  • Job Tracker
각 Task를 실행하는 Job을 제어하기 위한 프로세스. Mapper과 Reduce를 실행시키고 Job을 할당한다.

MapReduce엔진은 일종의 프레임워크(:12)라고 볼 수 있으므로, 다양한 Job을 실행할 수 있어야 한다. 때로는 색인파일을 만들어야 하고, 때로는 네트워크 파일시스템을 만들 수 있어야 한다. 그렇다면 주어진일을 할 수 있는 Job을 배포할 수 있도록 시스템/소프트웨어적으로 구성이 되어야 할 것이다.

Java(:12)로 구현할 경우에는 바이트코드를 배포하면 되므로, 시스템이나 운영체제에 관계없이 비교적 쉽게 배포시스템을 만들 수 있을 것이다. Perl, Python등을 이용해서 구현해도 Java와 동일한 효과를 얻을 수 있을 것이다.

컴파일 언어- C(:12)/C++ 같은 -로 구현을 한다면, 시스템과 운영체제에 의존적이 될 것이다. 가장 좋은 방법은 모든 시스템을 동일한 운영체제와 동일한 컴파일러, 라이브러리를 가지도록 세팅 하는 것이다. 꽤나 주의 깊은 작업을 필요로 할 것이고, 독자적인 운영체제 배포판을 하나 만드는게 가장 좋을 것이다.

C/C++에서 job의 배포는 공유 라이브러리(:12)형태로 배포되어야 할 것이다. 무가동 시스템으로 만들려면 동적 라이브러리 형태로 만들어서, signal(:12)이 주어질 때, 다운로드 받은 라이브러리(:12)를 동적으로 적재시키는 방식을 사용해야 할 것이다.

참고문헌

  1. attachment:mapreduce-osdi04.pdf
  2. http://jaso.co.kr/tatter/index.php?pl=133 C언어를 이용한 hadoop 파일 시스템 처리
  3. http://www.jaso.co.kr/tatter/index.php?page=8&setdate=200609 hadoop 살펴보기
  4. http://wiki.apache.org/lucene-hadoop/HadoopMapReduce Hadoop MapReduce
  5. 다카하시의 radium software 내의 MapReduce 번역 : 한글문서
  6. 쓰레드 베이스의 MapReduce : 한글문서
  7. MapReduce 소개글 : 한글문서