Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Contents

이 문서의 목적

  • Kafka 프로듀소와 컨슈머의 개발하는 법을 익힌다.
  • 컨슈머 그룹에 대한 개념을 잡는다.
  • 메시지 교환 프로그램의 구조를 개발한다.
  • IoT응용을 찾는다.

읽기전에

kafka의 구성을 끝냈다고 가정한다. 아직 kafka 구성을 마치지 않았다면 Kafka QuickStart 문서를 참고해서 구성을 끝내자.

chatting 프로그램 구조

채팅 프로그램은 아래의 구조를 가진다. 가장 단순한 구조로 만들고, 이 구조를 확장해 나가자.

 채팅 프로그램 구조

쉽게 생각 할 수 있는 구조다. 클라이언트는 웹 브라우저일 수도 있고, 네이티브 애플리케이션일 수도 있다. 이들 클라이언트는 WebSocket, HTTP, MQTT 등의 메시지 인터페이스를 가지고 있다. 서비스 특성에 따라서 어떤 프로토콜을 사용할지가 결정될 것이다. 웹 브라우저를 사용하는 서비스라면 HTTP와 WebSocket 조합 혹은 Websocket만으로 구성할 것이다. IoT 서비스 혹은 네티이브 애플리케이션을 대상으로 하는 서비스라면 WebSocket과 MQTT 조합으로 구성할 것이다. 물론 정답은 없다. 3개 모두를 이용 할 수도 있고, 전혀 다른 프로토콜을 이용 할 수도 있다.

여기에서 클라이언트는 웹 브라우저다. 웹 브라우저로 채팅 서비스를 만든다면, 메시지 전송은 HTTP로 하고, 수신 메시지를 websocket으로 하는게 가장 편할 거다.

클라이언트가 메시지를 보내면, 이 메시지는 Chatting server로 전달된다. Chatting server는 일종의 메시지 게이트웨이로 메시지에 대한 인증/권한 검사와 메시지의 송신, 수신 기능을 담당한다. 여기에서는 인증/권한은 신경쓰지 않는다.

Chatting server는 메시지를 그대로 kafka 클러스터 토픽으로 전송한다. Message processor는 kafka 토픽으로 부터 채팅 메시지를 읽고 처리해서 chatting 서버로 전송한다.

Web application server의 사양은 아래와 같다.
  • Message gateway로 작동한다.
  • HTTP와 Websocket 인터페이스를 제공한다.
  • HTTP는 메시지 전송을 위해서 사용하며, Websocket은 메시지 수신을 위해서 사용한다.
  • 클라이언트의 연결(connection) 정보를 데이터베이스에 저장한다. WAS는 두개 이상으로 구성될 것이다. 따라서 목적지를 찾기 위해서는 클라이언트 연결 정보를 저장해둬야 한다.
Kafka의 사양은 아래와 같다.
  • 3개의 kafka 노드를 준비한다.
  • 토픽 이름은 chatting로 한다.
  • 토픽 가용성을 위해서 2개의 리플리카를 만든다.
컨슈머가 하는 일은 아래와 같다.
  • 메시지를 읽어서 누가 누구에게 보내는 메시지인지를 확인한다.
  • 메시지 수신자를 찾았다면, routing table에서 이 유저가 어느 WAS에 연결하고 있는지 확인해서 메시지를 보낸다. 연결상태가 아니라면 메시지 박스에 저장해야 할 것이다.

개발 환경

kafka 클러스터 구성은 kafka 시작하기문서를 참고 한다. 개발언어는 go를 사용한다.
  • kafka-01, kafka-02, kafka-03 3개의 카프카 서버를 준비했다.

go 언어를 이용한 이유

(가장 큰 이유는 내가 go언어를 사용하고 있기 때문이다.) Kafka를 중앙에 두는 소프트웨어 구성은 MSA모델에 적합한 그림이 나온다. 클라이언트 종단에 위치하는 Message gateway의 경우에도 인증과 권한 서비스는 외부에 맡기고, 메시지를 중계하기 위한 최소한의 기능만을 가진다. Kafka 클러스터에 붙는 각각의 컨슈머들도 자신에게 주어진 최소한의 일들만을 한다.

즉 전체적인 그림을 보자면, simple is best의 척할을 따르는 작은 프로세스들이 IPC의 역할(특히 파이프의 역할과 비슷하다)을 하는 kafka를 통해서 연결되는 형국이다.

Go 언어는 simple is best한 서버 애플리케이션을 개발에 매우 적합한 언어다. net/http, grpc 두 개의 패키지만 가지고도 간단하게 서버 애플리케이션을 개발 할 수 있다. 생산성은 python 보다는 낮지만(MSA 류의 서버 애플리케이션 개발에 있어서는 python과 차이가 없다는게 내 생각이지만, 전반적으로는 python 보다는 못하다는 의견이다), C, C++ 과는 훨씬 높다. 배포하기도 쉽고(one file 배포가 가능하다), C와 비슷한 수준에서 빠르다.

이러한 이유로 Go 언어를 선택했다.(Java를 못해서가 아니다. 쿨럭...)

Chatting Server & 프로듀서

아래 그림은 web application server 구성을 보여주고 있다. HTTP로 들어오는 메시지를 읽어서 kafaka 토픽으로 전달하는 한편 Message processor로 부터 전달된 메시지를 클라이언트로 전송한다.

실제 환경에서 web application server는 두 대 이상으로 구성이 될 것이다. 따라서 앞단에는 로드밸런서가 놓일 것이다. 클라이언트는 여러 대의 서버 중 어느 하나에 websocket 연결을 맺을 건데, 어느 서버에 붙을지 알 수 없다. 그러므로 특정 유저에게 메시지를 보내려면 클라이언트가 어느 서버에 붙어 있는지에 대한 정보를 어딘가에 저장해야 한다. 여기에서는 단순하게 하나의 웹 서버만으로 구성한다.

이 서버는 카프카 토픽으로 메시지를 보내므로 프로듀서의 역할도 수행한다. 구성은 아래와 같다.

 Chatting Server 구성

유저가 /chatting을 호출하면, handler는 웹 소켓 클라이언트인 wsClient를 호출한다. wsClient는 http 연결을 websocket 으로 protocol upgrade하고 자신의 포인트를 map에 저장한다. map의 키는 유저를 식별 할 수 있는 어떤 값이고, 값으로는 클라이언트의 포인터다.

이제 MSG Receiver로 메시지가 전달된다. 이 메시지는 컨슈머가 처리를 끝낸 메시지로 메시지를 받을 유저의 정보를 포함하고 있다. MSG Receiver는 메시지에 있는 유저 식별자를 이용해서 map에서 wsClient를 찾는다. wsClient가 있다면, wsClient.PushMsg() 를 호출한다. 이 메서드는 연결된 웹 소켓으로 메시지를 전송 한다.

채널(Channel)을 이용해서 메시지를 보내는 방법도 있다. PushMsg() 메서드를 채널로 바꾸기만 하면 된다. 직접 메서드를 호출하는 것에 비해서 쓸데 없는 오버헤드가 있는 것 같기는 한데, 버퍼드 채널을 이용해서 메시지를 버퍼링 할 수 있다는 장점이 있다.

위 그림에서는 message를 보내는 주체가 빠져있다. 1. Message는 Message processor가 라우팅 테이블을 뒤져서 클라이언트가 어느 web application server에 붙어 있는지 알아내서 전송하거나 2. 메시지 라우팅만을 처리하는 서버에 위임 하는 방법이 있다.

1의 경우 REDIS등으로 라우팅 테이블을 유지하고, 여기에서 읽어가게 하는 방법이 있겠으나 개발자들이 구현해야 하므로 좋은 방법은 아니다. 라우팅 테이블에 질의하는 라이브러리를 만들어 주는 방법도 있겠으나 언어마다 대응해야 하기 때문에 좋은 방법은 아니다.

역시 메시지 라우팅만을 처리하는 별도의 서버를 만드는 방법이 깔끔할 것 같다. 이미 만들어진 kafka 클러스터를 이용하면 될 테다.

 메시지 라우터용 전용 서버 구성

Chatting 서버 개발

Sarama Golang kafka 클라이언트 패키지를 이용하기로 했다. Chatting 서버는 프로듀서의 역할을 한다. 클라이언트로 부터 HTTP 데이터를 받아서, 카프카의 chatting 토픽에 Publish 한다. 데이터 포맷은 json 이다. 귀찮은 관계로 전체 채팅 서버를 개발하진 않겠다. 카프카에 메시지를 넣는 부분만 개발 하고, 유닛테스트로 작동여부를 확인한다.
{
    "from": "sender",
    "to": "receiver",
    "message": "send message ....."
}
sarama 패키지를 설치한다.
$ go get github.com/Shopify/sarama
replication factor가 1이고 두 개의 파티션으로 구성된 chatting 토픽을 만들었다.
$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic chatting
$ # ./kafka-topics.sh --list --zookeeper localhost:2181
chatting
testing
테스트를 위해서 컨슈머를 실행했다.
$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic chatting
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
간단한 프로듀서 프로그램을 만들었다.
package producer

import (
    "fmt"
    "github.com/Shopify/sarama"
) 

type Producer struct {
    ChatProducer sarama.SyncProducer
}

func NewProducer() *Producer {
    config := sarama.NewConfig()    
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true
    c, err := sarama.NewSyncProducer([]string{
        "kafka-01:9092",
        "kafka-02:9092",
        "kafka-03:9092"}, config)       
    if err != nil {
        panic(err)
    }
    return &Producer{ChatProducer: c}
}

func (p *Producer) Close() error {
    err := p.ChatProducer.Close()   
    if err != nil {
        return err
    }
    return nil
}
func (p *Producer) SendStringData(message string) error {
    partition, offset, err := p.ChatProducer.SendMessage(&sarama.ProducerMessage{
        Topic: "chatting",
        Value: sarama.StringEncoder(message),
    })
    if err != nil {
        return err
    }
    fmt.Printf("%d/%d\n", partition, offset)
    return nil

}
아래는 프로듀서를 테스트하기 위한 테스트 코드다.
package producer

import (
    "fmt"
    "github.com/stretchr/testify/assert"
    "testing"
)

func Test_Init(t *testing.T) {
    p := NewProducer()
    for i := 0; i < 30; i++ {
        err := p.SendStringData(fmt.Sprintf("Message #%d\n", i))
        assert.Nil(t, err, "")
    }
}
테스트 프로그램을 실행하면, 앞서 실행해 놓은 컨슈머 프로그램에 메시지가 전달 되는 걸 확인 할 수 있다.
$ go test
1/1PASS
ok  	github.com/yundream/chatting/producer	0.007s
chatting 서버는 HTTP 혹은 WS로 받은 메시지를 위의 코드를 이용해서 카프카에 쓰면 된다.

chatting 토픽은 2개의 파티션으로 구성이 된다. 프로듀셔 프로그램은 파티션을 선택해야 하는데, 위 코드에서는 "NewRandomPartitioner"를 이용해서 랜덤하게 선택하도록 했다. 귀찮아서 랜덤을 선택하긴 했지만, 이 경우 메시지의 순서가 바뀔 수 있으므로 Hash 함수등을 이용해서, 메시지를 받는 파티션을 고정하는 방법을 이용해야 할 것이다.

컨슈머 개발

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
    "os"
    "os/signal"
    "time"
)   

type Consumer struct {
    Consumer          sarama.Consumer
    PartitionConsumer [2]sarama.PartitionConsumer
}

func NewConsumer() (*Consumer, error) {
    consumer, err := sarama.NewConsumer([]string{ 
        "kafka-01:9092",
        "kafka-02:9092",
        "kafka-03:9092"}, nil)
    if err != nil {
        panic(err)
    }
    c := &Consumer{Consumer: consumer}

    partitions, err := consumer.Partitions("chatting") 
    if err != nil {
        panic(err)
    }
    for i, v := range partitions {  
        c.PartitionConsumer[i], err = consumer.ConsumePartition("chatting", v, sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }

    }
    return c, nil

}

func (c *Consumer) Run() {
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    for _, v := range c.PartitionConsumer {
        go func(v sarama.PartitionConsumer) {
            for {
                select {
                case msg := <-v.Messages():     
                    fmt.Println("Message   : ", string(msg.Value))
                    fmt.Println("Partition : ", msg.Partition)
                    fmt.Println("Offset    : ", msg.Offset)
                }
            }
        }(v)
    }
    // 귀찮아서 sleep 걸었다.
    time.Sleep(time.Second * 3600)  
}

func main() {
    consumer, err := NewConsumer()
    if err != nil {
        panic(err)
    }
    consumer.Run()
}
컨슈머를 실행 한 다음, 프로듀서 유닛테스트를 돌리면 아래와 같은 출력 결과를 확인 할 수 있을 것이다.
$ go run consumer.go 
Message   :  Message #0

Partition :  1
Offset    :  39
Message   :  Message #1

Partition :  0
Offset    :  34
Message   :  Message #2

Partition :  1
Offset    :  40
Message   :  Message #3

Partition :  0
Offset    :  35
Message   :  Message #4

Partition :  0
Offset    :  36
Message   :  Message #5

......
두개 파티션 모두에서 메시지를 읽는 걸 확인 할 수 있다.

컨슈머 그룹

두 개 이상의 컨슈머를 하나의 그룹으로 만들 수 있다. 그리고 하나 이상의 컨슈머 그룹을 구성 할 수 있다. 우리는 컨슈머 그룹을 이용해서, 다양한 메시지 모델을 만들 수 있다.

PUB/SUB 구조를 만들어 보자. 컨슈머 그룹은 같은 컨슈머 아이디를 가지며, 이들 사이에서 offset을 공유한다. 반면 다른 컨슈머 그룹과는 offset을 공유하지 않는다. 따라서 하나의 메시지를 서로 다른 컨슈머 그룹에 보낼 수 있다. 예컨데, 특정 메시지를 실시간 분석하는 컨슈머와 메시지를 처리하고 저장하는 컨슈머에 보내는 식의 구성이 가능하다.

 컨슈머 그룹

chatting 서비스를 위해서 컨슈머 그룹을 만들기로 했다. 컨슈머 그룹의 이름은 testgroup.go로 chatting 토픽을 SUB 한다. 지금 chatting 토픽은 두 개의 파티션으로 구성됐으니, 아래와 같은 모습을 가진다.

 컨슈머 그룹 묘사

kafka 툴을 이용해서 컨슈머 그룹 상태를 확인해 보자.
$ ./kafka-consumer-groups.sh --list --zookeeper localhost
testgroup.go
두 개의 컨슈머를 붙이고 나서 testgroup.go 정보를 확인했다.
$ ./kafka-consumer-groups.sh --describe --group testgroup.go --zookeeper localhost

TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID 
chatting           0          1265            73              -1192      home:15335dbf-3a3d-4646-8040
chatting           1          1226            64              -1162      home:6bbe45d1-9e64-4261-a4a7
2개의 컨슈머가 붙어 있는 걸 확인 할 수 있다. 컨슈머 하나를 죽이고 확인해 보자.
$ ./kafka-consumer-groups.sh --describe --group testgroup.go --zookeeper localhost
TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID
chatting           0          1265            125             -1140      home:6bbe45d1-9e64-4261-a4a7
chatting           1          1226            102             -1124      home:6bbe45d1-9e64-4261-a4a7
하나의 컨슈머가 두 개의 파티션 모두를 SUB 하는 걸 확인 할 수 있다. 컨슈머 그룹에서의 파티션과 컨슈머들 사이의 연결은 주키퍼를 이용해서 자동으로 관리 한다.

두 개의 컨슈머에 대해서 3개의 컨슈머가 연결하면 어떻게 될까 ?
# ./kafka-consumer-groups.sh --describe --group testgroup.go --zookeeper localhost
TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID
chatting           0          1265            432             -833       home:6626cbae-437c-41c3-b301
chatting           1          1226            394             -832       home:6bbe45d1-9e64-4261-a4a7
두 개의 컨슈머만 사용하고 있다. 4개를 붙여도 마찬가지로 두 개의 컨슈머만 사용한다. 따라서 만약 컨슈머 그룹의 처리능력이 떨어진다면, 단순히 컨슈머 그룹에 있는 컨슈머의 갯수 뿐만 아니라, 파티션까지도 함께 늘려줘야 한다. 처음 시스템을 구성 할 때, 저 성능의 시스템으로 카프카를 구성하고 고성능의 시스템으로 컨슈머를 구성해야 할 것이다.

컨슈머의 갯수가 파티션 보다 클 경우, 여분의 컨슈머가 생기는데 이는 작동중인 컨슈머에 문제가 생겼을 때 백업용도로 사용 할 수 있다. 이 과정은 자동으로 이루어진다.

아래는 테스트에 사용한 코드다. sarama 패키지는 파티션을 리밸런싱 하는 등의 컨슈머그룹 제어 기능을 지원하지 않는다. 나는 sarama 기반의 sarama-cluster 패키지를 이용해서 아래 예제를 만들었다.
package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	brokers := []string{"kafka-01:9092", "kafka-02:9092", "kafka-03:9092"}
	topics := []string{"chatting"}
	consumer, err := cluster.NewConsumer(brokers, "testgroup.go", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume messages, watch errors and notifications
	for {
		select {
		case msg, more := <-consumer.Messages():
			if more {
				fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "") // mark message as processed
			}
		case err, more := <-consumer.Errors():
			if more {
				log.Printf("Error: %s\n", err.Error())
			}
		case ntf, more := <-consumer.Notifications():
			if more {
				log.Printf("Rebalanced: %+v\n", ntf)
			}
		case <-signals:
			return
		}
	}
}
brokers로 kafka-01, kafka-02, kafka-03을 설정했다. cluster.NewConsumer로 컨슈머 그룹을 설정한다. 컨슈머 그룹의 아이디로 "testgroup.go"를 설정했다. 가장 중요한 설정일 것이다. 이제 consumer.Messages 메서드를 이용해서, 채널로 부터 메시지를 읽으면 된다. go언어에서 제공하는 select를 이용하면 채널 입력을 멀티플랙싱 할 수 있다.

Kafka 0.8.1.1 버전까지는 컨슈머의 offset commit 정보를 zookeeper가 관리했다. 하지만 zookeeper는 이런류의 정보(빈번한 업데이트)를 관리하기에 좋은 데이터베이스는 아니다. 이 후 버전부터는 offset 정보를 zookeeper가 아닌 kafka가 직접 처리하고 있다. 내가 테스트에 사용한 0.10 버전역시 kafka가 직접 처리하고 있다. 테스트에 사용한 sarama-cluster 패키지도 kafka의 offset을 이용하고 있다.

채팅 애플리케이션 개발

이것으로 채팅 애플리케이션 개발을 위한 요소들에 대한 테스트를 모두 끝냈다. 각 요소들에 대해서 살펴보자.
  • kafka 클러스터 : 두 개의 파티션을 가지는 chatting 토픽을 만들었다.
  • Message Gateway : 클라이언트(브라우저)의 메시지를 받아서 chatting 토픽에 메시지를 전달 한다. 원래 Message Gateway는 메시지의 인증과 권한에 대한 기능을 가져야 하겠지만, 여기에서는 생략 한다.
  • Message process 그룹 : chatting 토픽에서 읽은 메시지를 처리 한 다음에, 메시지를 받을 Message Gateway 노드에 전달 한다.
이정도까지 다뤘으면, 나머지는 그냥 짜면 되는 거라서 재미가 떨어진다. 시간이 남으면 조금씩 만들어봐야 겠다.

Message Gateway

Message Process

참고