• yundream
  • 2018-07-15 03:28:20
  • 2018-05-27 16:47:46
  • 54915

Contents

메시징 애플리케이션

AWS에서 제공하는 인프라와 서비스들을 이용해서 대량의 메시지를 교환하는 메시징 애플리케이션을 만들어보려 한다.

ElasticCache REDIS PUB/SUB

메시지는 REDIS PUB/SUB을 이용한다. 아래와 같은 구성을 가질 것이다.

 PUB/SUB 구성

PUB/SUB은 REDIS 고유의 기능으로 구현에 어려움은 없다.
  1. 채팅방에 해당하는 Topic을 만든다.
  2. 메시지 수신자는 해당 Topic을 구독(REDIS SUBSCRIBE)한다.
  3. 송신메시지는 해당 Topic에 출판(REDIS PUBLISH)한다.
  4. PUBLISH된 메시지는 모든 수신자가 받아볼 수 있다.

Persistence Database 준비

REDIS PUB/SUB는 "메시지 큐가 아니다". 네트워크, 클라이언트 애플리케이션 혹은 어떤 시스템 문제로 메시지를 놓칠 경우 이 메시지를 확인 할 수 없게 된다. 어떤 클라이언트는 메시지를 받고, 어떤 클라이언트는 메시지를 받지 못할 수 있다. 따라서 별도의 데이터베이스에 메시지를 따로 보관해야 한다. 클라이언트가 메시지 수신에 실패 할 경우, 연결이 재 설정될 경우 먼저 데이터베이스로 부터 최근 N개의 메시지를 읽어와서 출력한 다음 실시간 메시징에 참여하도록 구성하면 될 것이다. 아래와 같은 구성이 되겠다.

 Database의 준비

  1. 메시지는 REDIS와 Databsase 양쪽으로 쏜다. Database는 RDS든 DynamoDB든 어느 것을 사용해도 문제 없을 것이다.
  2. 클라이언트를 실행하면 Subscribe와 데이터베이스 읽기를 동시에 한다. 데이터베이스 읽기가 우선으로 데이터베이스에서 최근 "N" 개의 메시지를 가져와서 출력 한 다음 Subscribe 도착한 메시지를 출력한다.
이렇게 하면 메시지가 빠지는 것을 막을 수 있을 것인데, 대신 메시지가 중복될 수 있다. 이 문제는 메시지 아이디로 해결 할 수 있다. 데이터베이스로 Mysql-MyISAM 엔진을 사용하는 경우 아래의 쿼리로 1씩 증가하는 메시지 아이디를 만들 수 있을 것이다.
UPDATE message_history SET seq = seq + 1 WHERE channel=5;
MyISAM은 테이블 레밸 잠금을 사용한다. 즉, 업데이트 쿼리를 실행하는 동안 전체 테이블이 잠기기 때문에 안전하게 메시지 아이디를 관리 할 수 있다. 다른 엔진을 사용하거나 DynamoDB와 같은 솔류션을 사용 할 경우에는 작동하지 않을 수 있으니 주의깊게 검토를 해야 한다. 혹은 아래와 같이 트랜잭션처리를 해야 할 것이다.
-- begin transaction here

select counter from myCounters where counter_id = 1 FOR UPDATE;

-- now the row is locked and nobody can read or modify its values

update myCounters set counter = ? where id = 1;

-- set ? to counter + 1 programmatically

commit; -- and unlock...
UPDATE는 테이블을 잠궈버리기 때문에 동시성 문제가 생기지 않을 것이다. 단순한 솔류션이 될 수 있기는 하지만 테이블이 잠기기때문에 성능 이슈가 생길 수 있다. 성능 이슈가 중요하다면 row 단위로 트랜잭션을 거는 것도 괜찮은 방법이 되겠다.

혹은 그냥 시간을 ID로 사용하는 것도 방법이다. 마이크로세컨드 단위로 시간을 설정하면, "거의 중복 없는" 메시지 아이디를 설정 할 수 있을 것이다. "거의 중복이 없다"는건 중복이 있을 수 있다는 얘기라서 기분이 찜찜 할 수 있겠는데, 이 정도는 클라이언트 영역에서 어렵지 않게 처리(유저ID까지 검사하면 중복은 100% 막을 수 있을 것이다. 마이크로세컨드 단위에서 한 유저가 두개의 메시지를 보내는 일은 없을테니) 할 수 있을 것이다. 무엇 보다 테이블 잠금이나 트랜잭션에 의한 애플리케이션의 복잡도 증가와 성능 문제를 신경 쓸 필요가 없는 장점이 있다.

분산 시스템

지금까지의 아이디어를 기반으로 테스트 애플리케이션을 만드는 건 어렵지 않을 것이다. 하지만 대량의 메시지를 처리하기 위해서
  1. 두 개 이상의 Sub 프로세스와 두 개 이상의 Pub 프로세스의 구성
  2. REDIS 클러스터의 구성
  3. 성능과 확장성과 가용성을 모두 일정 수준 이상으로 유지
할 수 있는 시스템을 구성하는 건 쉬운일이 아니다. 쉬운일이 아니지만 머리를 굴려보자.

PUB/SUB KEY 관리

결국 Subscriber 는 두 개 이상으로 구성을 해야 할 거다. N 개의 Subscriber가 있다고 가정해보자. 문제를 단순화 하기 위해서 REDIS는 하나의 노드로만 구성을 했다.

 구성-1

ELB(Classic ELB)는 클라이언트의 요청을 라운드로빈 하게 Subscriber로 분배 할 것이고, Subscriber는 메시지를 받기 원하는 토픽을 Sub 하게 된다. 이 모델은 잘 작동 할 것이다. 하지만 좋은 모델은 아니다. 나는 하나의 Topic에 대해서 하나의 고루틴을 만들 생각이다. ELB가 라운드로빈하게 요청을 분산하기 때문에, 하나의 토픽에 대해서 Subscriber 갯수 만큼의 연결이 만들어진다. 만약 메시지를 주고 받는 토픽이 100만개이고 10개의 노드가 있다면, 모든 Subscriber에 100만개의 연결이 만들어질 것이다.

 중복되는 고루틴

나는 "100만개의 토픽이" 10개의 노드에 분산되서 처리되는 걸 원한다. 아래와 같이 특정 토픽은 특정 Subscriber로 향하게 만들어서 토픽을 분산할 수 있어야 한다.

 토픽을 분산

컨시스턴트 해시(Consistent hash)를 이용해서 이 문제를 풀기로 했다.

컨시스턴트 해시를 이용한 채널 분산

Jumped consistent hash를 이용하기로 했다. 이 해시를 이용하면 전체 클러스터의 노드 갯수가 늘어나거나 줄어드는 상황에서 컨시스턴트하게 key를 분산 할 수 있다. 해시는 로 표현 된다. k는 key, n은 클러스터의 노드 갯수다. 여기에서 k는 topic id가 될 것이다. 알고리즘은 아래와 같다.
func Hash(key uint64, numBuckets int) int32 {

	var b int64 = -1
	var j int64

	for j < int64(numBuckets) {
		b = j
		key = key*2862933555777941757 + 1
		j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
	}

	return int32(b)
}
알고리즘에 대해서 궁금하다면 Jump Consistent hash 문서를 읽어보자.

유저 요청이 들어오면 해시알고리즘을 적용해서 라우팅을 해줘야 하기 때문에, Subscriber 앞단에 proxy 서버를 구성해야 한다. 일반적이고 간단한 구성인 것으로 보이지만 추가적인 자원이 들어가는 데다가 노드의 갯수를 proxy 서버에게 알려줘야 하는 등 실제 구성은 그리 단순하지가 않다. 물론 이렇게 구성 할 수도 있겠지만 가능한 단순 하게구성하려고 한다.

해서 애플리케이션이 연결하기 전에 어떤 노드에 연결할 지를 알려주는 라우터 서버를 만들기로 했다.
  1. 애플리케이션은 먼저 라우터 서버에 연결해서 어떤 노드에 연결할지를 요청한다.
  2. 라우터서버는 현재 클러스터 크기를 알고 있어야 한다. ELB는 타겟그룹(로드밸런서에 묶여있는 노드들의 그룹)에 이슈가 생기면 람다로 통지 할 수 있는 기능이 있다. 이 기능을 이용해서 ELB에 붙어 있는 노드의 갯수를 알 수 있다.
  3. 애플리케이션은 ELB로 간다. ELB를 ALB로 구성을 하면, Path로 원하는 노드에 요청을 라우팅 할 수 있다.
 Hash를 이용한 로드 밸런싱

정상적인 상황에서라면, 잘 작동 할 것이다. 문제는 Fail이 발생할 때의 처리 방안이다. 여기에서 Fail은 두 개 영역에서 발생 할 것이다.
  1. Subscriber 중 하나이상에 문제가 생김
  2. Redis에 문제가 생김
Subscriber에 문제가 생기는 경우를 먼저 살펴보자. AWS에서는(다른 클라우드도 마찬가지다) 자동으로 복구되도록 할 수 있다. 하지만 복구에는 시간이 걸리며, 관리자가 자동 복구가 실패해서 직접 복구를 해야 하는 경우가 있다. 클라우드에서는 소수의 애플리케이션이나 인스턴스가 실패하는 것은 심각한 문제로 취급하지 않는다. 예를 들어 10대의 노드로 이루어진 서비스가 있는데, 밤 사이에 한 두대가 죽었다고 하더라도 서비스 전체에 영향을 미치는 징후가 발생하지 않으면 굳이 전화벨을 울려서 관리자나 개발자를 깨우지 않는다. 그냥 메일만 보낼 따름이고, 아침에 출근해서 해결 한다. 클라이언트에서 노드가 죽는 건 크리티컬한 이슈가 아니다.

따라서 Subscriber 실패가 꽤 오랫동안 복구가 안될 수도 있다. 이 경우 어떤 일이 발생하는지 살펴보자.

컨시스턴시 해시로 분산환경을 구성하려면, 실패가 발생 했을 때 어떤 노드가 백업을 할지도 설정한다. 원칙대로 하자면 Subscriber가 실패하면, key를 Subscriber로 분산을 해야 하는데 이때 Key가 어떤 Subscriber로 분산될지를 알고리즘으로 찾을 수 있어야 한다. 예를 들어 Swift(오브젝트 스토리지)의 경우 모든 Key에 대해서 최소 2개의 백업노드를 준비한다. 메시징 서비스도 이런 방법을 사용 할 수 있다. Router에서 실패한 Subscriber로 향하는 key에 대해서는 한번더 Hash 연산을 하는 정도로 쉽게 구현 할 수 있다. 동일한 Hash 연산을 사용하기 때문에, 컨시스턴시하게 분산 할 수 있다. 실패와 복구 시점에서 약간 꼬일 수는 있지만 점진적으로 원래자리를 찾아갈 테니 큰 문제는 아니다.

REIDS스는 ElasticCache Redis Shard Cluster에 맡긴다.

 Shard Cluster

클러스터를 구성하는 각 샤딩은 하나의 프라이머리 노드와 하나 이상의 복제노드로 구성 할 수 있다. 프라이머 노드에 문제가 생기면 가장 낮은 지연을 가지는 복제노드가 프라이머리가 된다.

Pub/Sub 서버 Failover 자세히 살펴보기

노드에 문제가 생겼을 때, 별다른 조치를 취하지 않기로 했다. 이때 예상 할 수 있는 상황은 아래와 같다.
  1. 노드에 연결했던 애플리케이션의 웹 소켓 연결이 종료된다. 네트워크오류 등에 의한 일시적인 실패일 수도 있으니 3번 정도 연결시도를 한다.
  2. 3번 모두 연결에 실패했다면
  3. 라우터에 어디에 연결해야 할지를 묻는다.
  4. 라우터는 어느 노드에 연결해야 할지를 알려준다.
라우터가 타겟노드의 상태를 정확히 모니터링 하고 있다면, 잘 작동 할 것이다. 문제는 "정확히 모니터링"하기가 까다롭다는 점이다. HealthCheck는 성공이지만 웹 소켓 연결은 실패 할 수 있기 때문에, 실제 웹 소켓 연결 상태를 확인하거나 로그를 모니터링해서 연속되는 연결에러를 확인해서 능동적으로 타겟그룹에서 제외하는 등의 등 정교한 관리가 필요하다.

이 방식으로도 가능하겠으나 복잡하다. 시스템은 가능한 단순하며 스스로 복구 할 수 있도록, 그리고 특정자원을 사용하지 못하더라도 다른 자원을 사용해서 서비스를 할 수 있도록 해야 한다.

처음 연결을 할 때, 노드 실패시 연결해야 하는 백업노드를 알려주면 어떨까 ? 두 개의 백업노드를 구성한다고 가정하면 그냥 Hash를 2번 더 돌리면 된다. 즉 아래와 같은 테이블이 만들어진다.
Key Master Replica-1 Replica-2
0001 2 1 4
0002 1 2 3
0003 3 4 2
이 테이블은 (Swift 처럼) 서버(이 경우 라우터)가 메모리에 들고 있을 필요가 없다. 클라이언트가 물어볼 때, 연산해서 알려주면 된다. Key가 0001인 클라이언트는 2, 1, 4 를 받을 것이다. 만약 2가 실패하면 1, 1도 실패하면 4번 노드에 대한 요청을 할 것이다. ALB의 Path-Based Routing를 이용해서 간단히 구현할 수 있다. 이 방법의 장점은 아래와 같다.
  1. 클라이언트가 능동저으로 노드에 연결 할 수 있다.
  2. Proxy를 둘 필요가 없다.
  3. 서버 메모리를 유지 할 필요 없다. 약간의 Hash 연산만 추가될 뿐이다.
ALB를 이용해서 노드를 타겟그룹과 연결하려면 노드의 갯수 만큼 타겟 그룹이 늘어나는 문제가 있기는 한데(완벽한 설계는 없는 법이다.) 얻는 이점에 비하면 큰 문제는 아니라고 할 수 있겠다.

 여러 타겟 그룹

각 타겟 그룹을 2개 이상의 노드로 구성하는 식으로 가용성을 크게높일 수 있을 것이다.

REDIS FailOver

REDIS 노드의 실패&복구는 AWS에 맡긴다고 퉁치고 넘어가려고 했는데, 아무래도 좀 자세히 살펴봐야 할 것 같다.

AWS ElasticCache는 샤드 클러스터(Shard Cluster)서비스를 제공한다. 관리자는 아래와 같이 웹 콘솔(혹은 CLI)을 이용해서 Cluster 모드로 레디스 클러스터를 전개 할 수 있다. 아래의 경우 2개의 복제를 가지는 3개의 샤드로 구성된 클러스터를 만들었다.

이렇게 만들어진 클러스터는 Redis의 hash slot를 기반으로 자동으로 해싱을 수행한다. 이렇게 해싱을 하려면 애플리케이션은 redis cluster client 라이브러리로 개발을 해야 한다. 클러스터 해싱의 특징은 아래와 같다.
  • 각 클러스터는 16384개의 해시 슬롯(hash slots)을 가진다. 해시연산으로 CRC16 modulo {key}를 사용한다.
  • 슬롯은 전체 샤드클러스터로 분산배치 된다.
  • 개발자는 Redis cluster 클라이언트를 사용해서 개발해야 한다. 클라이언트는 자동으로 올바른 샤드로 리다이렉트 한다.
5개의 샤드로 구성된 클러스터라면 아래와 같은 슬롯 맵이 만들어 질 것이다.

 ElasticCache Cluster Sharding

클러스터는 최디 15개의 샤드로 구성 할 수 있으며, 각 샤드는 5개의 복제 노드를 가질 수 있다.

 Redis 클러스터

위 그림은 "샤드 3 x 복제 2" 로 이루어진 클러스터를 보여주고 있다. 만약 프라이머리 샤드에 문제가 생기면, 복제 중 하나가 프라이머리로 승격해서 읽기/쓰기 서비스를 수행한다. 이 과정은 미리세컨드 이내에 이루어진다.

결론. 정말로 AWS에 맡기면 될 것 같다.

마무리

최종 구성이다.

 최종 구성

뭔가 될 거 같기는 하다. 작동하는 애플리케이션을 만들려면 아래의 것들을 추가 해결해야 할 것이다.
  • 인증/권한 : PUB/SUB 채널에 대한 인증/권한 로직을 만들어야 한다. Cognito로 해결 할 수 있을 거 같은데 살펴봐야겠다.
  • 보안 : 요즘엔 메시지 보안도 엄청 중요하다.
  • 플러그인 : 플러그인을 개발 다른 앱들을 붙일 수 있는 그런 구조를 만들고 싶은데..

참고