Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Contents

MQTT Cluster

MQTT Cluster를 구성하는 목적은 아래와 같다.
  1. 대량의 메시지 처리 : 수백/수천만의 MQTT 클라이언트 요청을 처리할 수 있어야 한다. 당장 만들겠다는 것은 아니고, 목표는 그렇게 잡아보자는 이야기다. 꿈의 크기야 뭐 제한이 없으니까.
  2. 고가용성 : 안정적으로 서비스 할 수 있어야 한다. 서버 구성요소 중 하나 이상에 문제가 생기더라도, 서비스 제공이 가능해야 한다.
  3. 확장성 : Cluster에 노드를 추가하는 것으로 간단하게 서비스를 확장할 수 있어야 한다.

서비스 소개

어떤 타입의 서비스인지에 따라서 클러스터 구성 기술이 달라질 수 있기 때문에, 서비스를 특정하기로 했다. 가장 무난한 멀티 클라이언트 채팅 시스템을 선택했다. 원래는 IoT 기기로 부터의 센서 데이터 수집을 선택하려 했는데, 개념을 설명하는데는 일반적인 서비스가 나을 것 같아서 채팅 시스템으로 변경했다.

서비스의 기능은 다음과 같다.
  1. Private channel을 가진다. Private channel을 이용해서 단일 유저에게 메시지를 보낼 수 있다.
  2. Group channel을 가진다. 한 명 이상의 유저가 참여하는 Topic이다. 뉴스라면 구독 서비스가 되겠고, 채팅이라면 채팅방 서비스가 되겠다.
  3. 개인 메시지 함 : 유저가 연결하지 않은 경우, 유저 수신 메시지를 임시 저장하기 위한 서비스

설계

시스템 구성

시스템은 대략 다음과 같이 구성한다.

Node-1, Node-2는 유저의 Private channel(MQTT Topic)을 유지한다. Private channel로 보낸 메시지는 "Global message proc(이하 GMP)"로 보낸다. Global message proc는 이 메시지를 Message Queue에 적재한다. 이때, 유저 연결 테이블을 확인해서 유저가 어느 Node에 있는지를 찾아낸다. Node ID를 찾았다면, Node ID를 Key로 REDIS 에 밀어 넣는다.

User ID를 Key로 하지 않는 이유는 User ID를 key로 할 경우, 각 노드는 User ID의 갯수만큼 list에 접근을 해야 하는 문제가 있기 때문이다. Node ID로 하면, 한번에 자신의 데이터를 읽을 수 있다.

메시지 전송시점에 메시지 수신자가 연결하지 않았을 수도 있다. 이 경우 GMP는 유저 메시지 함에 메시지를 보내면 된다. 메시지 함은 User ID가 Key인 REDIS List로 관리한다. 새로 연결한 유저는 메시지 함에 읽지 않는 메시지가 있는지를 확인하는 과정을 거친다. GMP가 메시지함에 메시지를 쓰는 걸 끝내기 전에, 유저 가 연결을 끝내버릴 수도 있다. 이 경우, 유저는 다음 연결 전까지는 메시지함의 메시지를 확인할 수 없게 된다. 해결 방법을 고민해 보자면
  • 새로 연결한 유저는 수초 정도 후에, 메시지 함을 한번 읽도록 코드를 만든다.
  • 메시지함을 읽는 초기 간격을 너무 길게 하면 메시지 반응성이 떨어지고, 너무 짧게하면, 메시지 함에 늦게 도착하는 경우가 생길 수 있다. 1초, 10초, 1분 이렇게 3번 정도 다른 간격으로 읽게하면 문제를 해결할 수 있다.

Group Table

라우팅 테이블의 데이터 구조는 대략 아래와 같을 것이다.
Group Table
  - Group A : {user-1, user-2, user-3}  
  - Group B : {user-1, user-4, user-5}

유저 연결 상태

REDIS BitMap으로 구현한다. 클라이언트의 연결 상태(0 or 1)을 저장하는 BitMap 테이블이다. 클라이언트의 연결 정보는 다른 테이블에 저장한다. 연결 상태를 따로 저장하는 이유는 아래와 같다.
  • 클라이언트의 연결 정보는 Hash(REDIS 자료구조)로 구성해야 할 건데, Hash에서 데이터 검색은 꽤 많은 시간이 걸린다.
  • 그룹의 유저에게 메시지를 보낸다고 가정해보자. 이 그룹에는 10명의 유저가 있는데, 현재 2명만 연결된 상태다. Hash 만 이용할 경우 10번의 Hash 연산이 필요하다.
  • 클라이언트의 연결 정보는 BitMap으로 구성할 수 있는데, BitMap 연산은 O(1)이다. 연결하지 않은 8명의 유저의 경우 바로 메시지 함으로 메시지를 보내면 되고, 2명의 유저에 대해서만 연결정보를 찾아서 메시지를 전송하면 된다.

메시지 함

유저가 받지 못한 메시지를 임시 저장하기 위한 공간이다. 유저가 연결하면 먼저 메시지함의 메시지를 처리하도록 한다. 역시 REDIS로 구성한다.

자료 타입으로 리스트(List)를 선택했다. User의 ID를 Key로 하고, 메시지는 LPUSH로 밀어 넣는다. 이렇게 구성하는 이유는 다음과 같다.
  • User ID를 Key로 List를 만든다. 직관적이다.
  • 유저가 메시지를 소비하지 않을 경우 List에 계속 데이터가 쌓일 거다. 무한대로 쌓아둘 수는 없으니 크기에 제한을 둬야 하는데, LPUSH와 LTRIM 조합으로 하면, 리스트 크기를 관리하기가 쉽다. 예를들어 최근 1000개의 메시지만 남기고, 오래된 메시지는 삭제(혹은 다른 데이터베이스에 저장)하는 식의 구현이 가능하다.

메시지 흐름

메시지 흐름을 자세히 그려보았다.

  • MQTT Client는 Private topic만 가진다. 이 Private topic으로 모든 메시지를 받도록 구성한다. 예컨데, 그룹 메시지도 private topic "그룹타입의 메시지 형태"로 보낸다.
  • 메시지는 Msg Sender로 보낸다.
  • Msg Sender은 Global Message Proc로 보낸다.
  • Global Message Proc는 private message인지, Group message인지를 확인해서 처리한다.
    • private message라면, user_id를 key로 하는 REDIS list에 밀어 넣는다.
    • group message라면, 유저 목록을 찾아서 user_id 갯수 만큼 REDIS list에 밀어 넣는다.
  • 각 Node에 있는 Message proc는 메시지를 읽어서 수신 유저의 topic에 PUB 한다.
REDIS Message Queue는 BLPOP를 이용하면 블럭킹 모드에서 데이터를 꺼낼 수 있다. 여기에서는 Message queue를 이용해서 메시지를 분배하고 있는데, HTTP로 분배하는 방법을 고려할 수도 있다. 장/단점이 있는데
  1. Message Queue는 HTTP 보다 효율적으로 작동한다. 반면에 MQTT Node가 뻗어버릴 경우 Message Queue에 있는 메시지를 처리하기가 까다롭다는 단점이 있다.
  2. HTTP는 Message Queue보다 느리게 작동할 거다. 하지만 동기방식이기 때문에, 에러처리가 명확하다는 장점이 있다. 메시지를 수신할 MQTT 노드가 뻗었다면, 메시지 함에 넣기만 하면 된다.
장/단점을 적어 놓고 보니, Message Queue 보다는 HTTP로 Message proc에 직접 쏴주는 방식이 낫겠다는 생각이 든다.

MQTT 클러스터

MQTT 클러스터 구성 - 1

MQTT 노드가 수백/수천을 넘어가는 경우를 생각해 보자. 아래와 같은 구성을 생각해 봤다.

MQTT 클러스터를 위한 VPC를 구성할 거다. VPC의 네트워크 크기는 16bit이니, MQTT 노드가 늘어남에 따라서 24bit 크기의 subnet을 할당하면 될테다.(실제 구축한다면 Avalibility zone을 고려해야 하니 약간 달라지겠는데, 여기에서는 단순화 했다.)

MQTT 노드에서 GMP로 가는 트래픽은 ELB로 분산할 수 있다. 하지만 GMP에서 MQTT로 가는 메시지의 경우 경로가 정해지기 때문에 특정 노드로 트래픽을 보낼 수 있어야 한다. 따라서 해당 노드로 직접 메시지를 보내야 한다.

MQTT 노드가 죽을 경우에는 노드로 메시지 전송이 실패할 건데, GMP는 노드가 살아 있는지 죽어있는지를 알 수 있어야 한다. 만약 클라이언트가 향하는 노드가 죽어있다면, 메시지는 유저 메시지 박스로 보내야 한다. MQTT 클러스터 노드의 상태를 관리해야 한다는 의미다. Zabbix를 이용해서 노드 상태를 모니터링 하고, 그 결과를 REDIS에 적는 등의 방법을 생각했는데, 결국 Zookeeper를 이용해서 MQTT 클러스터를 관리하기로 했다.

그림에서 MQTT Node subnet에 파란색 부분이 있다. 각 서브넷에 메시지를 분배하는 녀석을 따로 두면 어떨까라는 생각으로 집어 넣었는데, 굳이 넣어야 하는지는 비용 측면에서 고민을 해봐야 겠다.

MQTT 클러스터 구성 - 2

굳히 GMP라는 걸 둬서 네트워크 홉(Hop)을 늘릴 필요가 있을까 ? 유저 연결 정보는 Bitmapfixed string으로 관리 할 계획인데, 둘다 한번에 가져올 수 있는 자료구조다.

그냥 각 MQTT 노드가 직접 목적지 노드로 메시지를 보내는 것은 어떨까. ?

MQTT의 토픽(topic)을 REST 하게 구성 하면, 메시지를 직렬화하지 않고 토픽만으로 연결정보를 요청할 수 있을 거다. 클라이언트 연결여부와 연결 노드를 확인하면, 해당 노드로 직접 메시지를 보낸다.

노드들의 상태는 주키퍼로 관리한다. 단지 상태만 관리하는 거라면 천대가 넘어간다고 해서 관리에 문제가 생길 것 같지는 않다(라고 생각은 하고 있는데, 실제 환경에서는 어떨지 모르겠는데, 뭐 수천대까지 갈 이유가 아예 없을 수도 있어서..)

아주 단순한 구성이긴 한데, 문제가 눈에 보인다.

MQTT는 작은 크기의 메시지를 자주 보내는 목적으로 사용한다. 나는 IoT에서 사용을 생각하고 있는데, 이 경우에는 특히 더 그렇다. 수백대 혹은 수천대의 MQTT 서버와 연결을 맺어 놓는 막장 짓은(팩토리얼 이다 팩토리얼) 할 수 없고, 결국 메시지를 보낼 때마다 연결을 맺고 끝는 과정을 반복해야 한다. keep alive 시간을 줘서 일정시간 연결을 유지하는 방식으로 효율을 높일 수 있겠는데, 구성-1과 비교해서 어떤게 더 효율적일지는 테스트를 해봐야지 알 수 있겠다.

Redis를 이용한 유저 연결정보 관리

이 시스템의 가장 중요한 기능은 메시지를 유저에게 까지 성공적으로 라우팅 하는 거다. 메시지 라우팅은 Global Message Proc가 담당하는데, 성공적인 라우팅을 위해서는 아래 두 정보가 필요하다.
  1. 메시지를 수신할 클라이언트가 연결 상태에 있는가.
  2. 메시지를 수신할 클라이언트가 어느 노드에 연결해 있는가.
이들 정보는 Redis를 이용하기로 했다. 먼저 연결 상태(on, off)는 User connection Bitmap테이블에 저장하기로 했다. BitMap은 on, off 혹은 0과 1의 상태를 저장하기에 적당한 자료구조다. 10M 정도의 공간으로 1억명의 클라이언트 연결 상태를 저장할 수 있다. 게다가 상태를 알아내기 위한 시간복잡도는 O(1)이다. BitMap의 크기는 514MB이므로, 최대 43억의 클라이언트 연결 정보를 저장할 수 있다.

클라이언트가 연결 상태라면, User connection Hash 테이블에서 찾는다. 쉽게 생각할 수 있는 자료구조는 Hash 인데, 클라이언트의 수가 많아지면 아무래도 효율이 떨어질 수 있다. Hash를 여러개 만드는 것으로 이 문제는 해결할 수 있을 것이다.

혹은 REDIS string 자료구조에 저장하는 방법도 있다. 노드가 최대 8192정도까지 늘어난다고 가정하면, 4자리의 (fixed-size)고정크기로 저장을 할 수 있다. 예를들어 ID가 1780인 유저라면 1780 * 4 만큼 offset해서 4byte 값을 읽는 것으로, 유저의 노드를 찾을 수 있다. 1억명의 클라이언트라고 가정하면 총 4억 byte(381M)정도의 공간이 필요하다.

Hash 보다는 fixed-side string 자료구조가 더 나은 것 같다.

Zookeeper를 이용한 MQTT 클러스터 관리

Zookeeper에 MQTT Node를 등록해서 관리한다. 대략 구성은 다음과 같을 거다.

모든 MQTT 노드에는 zookeeper 클라이언트가 설치되서 Zookeeper에 ephemeral 노드 로 등록된다. 특정 MQTT 노드에 문제가 생기면, Zookeeper 서버는 Znode 트리에서 노드를 삭제한다. GMP GMP는 메시지를 보내기 전에, 클라이언트가 연결된 MQTT 노드가 정상 작동하는지 확인 한 후 메시지를 보낸다. 노드가 죽었다면, 메시지 함으로 보낸다.

서비스 HealthCheck 기능의 보강

MQTT노드를 ephemeral 노드로 등록하면, 클러스터 노드의 관리를 전적으로 zookeeper에 맡길 수 있다는 장점이 있다. 하지만 보완해야 할 문제가 있다. MQTT 노드에 문제가 생겼다는 것을 명확히 알 수 있는 사건들, 예컨데 MQTT Node의 네트워크 단절이라든지 shutdown의 경우에는 노드 디렉토리에서 깔끔하게 살제될테니 대응하는데 문제가 없다.

하지만 대부분 지저분한 사건이 발생한다는게 문제다. 예를 들어 MQTT 프로세스가 뻗어버리거나, 프로세스넌 잘 떠 있는데 데이터를 처리하지 못한다거나, 데이터를 처리하는 것 같기는 한데 성능이 지나치게 떨어진다거나, 데이터를 잘 처리하는 것 같기는 한데, 폭주해서 데이터가 마구 생성된다거나 하는 등의 사건은 Zookeeper 고유 기능으로 해결할 수 있는 문제가 아니다.

클러스터를 제대로 관리하려면, 노드의 건강 상태를 면밀히 체크할 수 있는 툴 개발이 필요하다. 노드의 건강 상태를 측정하기 위한 방법을 고민해 볼까 한다.

모니터링 솔류션과의 연동

Zabbix와 같은 전문 모니터링 솔류션을 도입한느 방법이 있겠다. 구성은 다음과 같다.

Zabbix server로 MQTT Node를 모니터링 한다. CPU, Memory, Disk Usage, Message 처리량, 네트워크 트래픽, 프로세스 갯수 등을 다방면으로 모니터링 해야 한다. Zabbix server는 모니터링 결과를 분석하고, 문제가 생겼을 경우 Zookeeper server의 노드 데이터를 변경하거나 삭제한다. 노드를 삭제해야 할지, 데이터 변경에서 끝낼지는 모니터링 이벤트의 중요도에 따라 달라진다. 예를들어 프로세스가 죽었다거나 처리량이 0이 등의 이벤트일 경우에는 즉시 삭제를 해야 할 것이다. CPU 점유율이 높다거나 처리량이 지나치게 적다거나 하는등의 이벤트에 대해서는 노드 데이터를 변경해서 관리자가 검토하도록 해야 할 거다.

노드 데이터가 변경되거나 삭제되면, 관리자는 Zabbix 모니터링 정보와 애플리케이션 로그를 이용해서 문제를 해결하면 된다.

Consistent Hashing을 이용한 클러스터 구성

로그인에 성공한 유저는 유저를 식별할 수 있는 값을 가지고 있을 거다. 이 값을 Key로 하는 consistent hashing 테이블을 유지하면, 유저가 연결된 노드를 더 빠르게 찾을 수 있다.

원리는 다음과 같다. K는 Key(유저 식별자), N은 노드다.
  1. 각 노드가 10개의 슬롯을 가지는 hashring을 만든다.
  2. N-1 < hash(K) <= N 일 경우 K를 가지는 클라이언트는 N 노드에 접속한다.
  3. 만약 N 서버에 문제가 생긴다면 클라이언트는 N+1노드에 접속한다.
이해를 돕기 위해 그림을 그렸다.

  1. 1 < hash(K1) <= 2 이므로 : K1 클라이언트는 노드-2에 연결한다.
  2. 2 < hash(K2) <= 3 이므로 : K2 클라이언트는 노드-3에 연결한다.
  3. 만약 노드-2가 다운되면 K1은 노드-2 + 1인 노드-3에 연결한다.
즉 노드의 상태만 알고 있으면, REDIS 테이블에서 클라이언트<->노드 정보를 조회할 필요 없이, 간단한 Hash 연산으로 클라이언트가 연결된 노드를 확인할 수 있다. 노드-2가 죽을 경우 노드-2로 향하는 연결이 모두 노드-3로 집중된다는 문제가 있다. 이 문제는 hash 연산을 한번 더 하는 것으로 간단히 해결할 수 있다.

Consistent Hashing을 적용한 구성이다.

노드를 가용성 존에 지그재그로 분산해서 배치하면 더 안전한 구성이 가능할 거다.