AWS MSK 를 테스트 중이다. MSK의 경우 가용 영역 갯수 만큼의 브로커를 만들어야 한다. 테스트하려는 도쿄리전은 3개의 가용 영역으로 구성돼 있으니 최소 사양으로 만들 경우 (kafka실행을 위한 최소 스펙)kafka.m5.large 인스턴스 3개를 실행해야 하므로 상당한 비용이 들어간다. AWS MSK에서 테스트할 kafka 버전은 2.x인데, 2.x는 사용해본적이 없어서 시간이 꽤나 걸릴 것 같은(비용도 그만큼 올라갈 것 같은) 불길한 예감이 들었다.
그래서 로컬 PC에서 테스트를 충분히 한 다음, AWS에서 테스트를 해보기로 했다. 예전 같으면 VirtualBox로 몇 개의 VM을 만들어서 테스트했겠지만 도커가 있는 요즘에는 그런 노가다를 뛸 필요가 없다. 도커를 이용해서 만들어보기로 했다.
kafka 클러스터를 실행하려면 zookeeper도 함께 실행해야 하므로 docker-compose설정이 있는지를 찾아봤다. kafka stack docker compose를 선택했다.
다양한 조합의 docker compose yml 파일들이 있는데, 나는 full-stack.yml 과 zk-multiple-kafka-multiple.yml 두개의 파일을 이용해서 full stack multiple zk & kafka 시스템으로 구성했다. multiple kafka & zookeeper에 kafka & zookeeper 관리 툴들 까지 함께 올려서 완전한 테스트,디버깅 환경을 만들었다고 보면 되겠다.
Kafka 환경 분석을 위해서 yml 파일의 내용을 출력해봤다.
docker-compose 명령으로 zookeeper & kafka 애플리케이션을 실행하고 정보를 살펴보자.
# docker-compose -f zk-multiple-kafka-multiple.yml up
$ docker-compose ps
/home/yundream/.local/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version!
RequestsDependencyWarning)
Name Command State Ports
-------------------------------------------------------------------------------------------------------------------------------
kafkafullstack_kafka-connect-ui_1 /run.sh Up 0.0.0.0:8003->8000/tcp
kafkafullstack_kafka-connect_1 /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp
kafkafullstack_kafka-rest-proxy_1 /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp
kafkafullstack_kafka-schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
kafkafullstack_kafka-topics-ui_1 /run.sh Up 0.0.0.0:8000->8000/tcp
kafkafullstack_kafka1_1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
kafkafullstack_kafka2_1 /etc/confluent/docker/run Up 9092/tcp, 0.0.0.0:9093->9093/tcp
kafkafullstack_kafka3_1 /etc/confluent/docker/run Up 9092/tcp, 0.0.0.0:9094->9094/tcp
kafkafullstack_ksql-server_1 /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp
kafkafullstack_schema-registry-ui_1 /run.sh Up 0.0.0.0:8001->8000/tcp
kafkafullstack_zoo1_1 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
kafkafullstack_zoo2_1 /docker-entrypoint.sh zkSe ... Up 2181/tcp, 0.0.0.0:2182->2182/tcp, 2888/tcp,
3888/tcp
kafkafullstack_zoo3_1 /docker-entrypoint.sh zkSe ... Up 2181/tcp, 0.0.0.0:2183->2183/tcp, 2888/tcp,
3888/tcp
kafkafullstack_zoonavigator-api_1 ./run.sh Up 9000/tcp
kafkafullstack_zoonavigator-web_1 ./run.sh Up 80/tcp, 0.0.0.0:8004->8000/tcp
현재 구성은 아래와 같다.
테스트
테스트 환경은 아래와 같다.
워크 스테이션 : 개인 PC(도커 컴포즈를 이용해서 kafka&zookeeper 를 실행한 호스트 컴퓨터)
운영체제 : 우분투 리눅스 18.04
Docker 버전 18.09.6
Kafka 버전 2.2.0
zookeeper 버전 3.4.0
kafkacat
kafka 설치할 때 같이 배포되는 kafka cli는 사용하기 귀찮아서, kafkacat 이라는 관리툴을 설치했다.
앞서 만든 kafka의 정보를 확인해 보자. -L 을 이용하면 메타데이터 정보를 확인 할 수 있다.
$ kafkacat -L -b localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
3 brokers:
broker 2 at 127.0.0.1:9093
broker 3 at 127.0.0.1:9094
broker 1 at 127.0.0.1:9092
9 topics:
....
3개의 브로커와 9개의 토픽이 있는 걸 확인 할 수 있다.
new_topic 라는 새로운 토픽을 만들어서 컨슈머와 프로듀서를 테스트해보기로 했다. 먼저 컨슈머 모드로 실행을 했다. 파티션을 명시하지 않으면 모든 파티션으로 부터 메시지를 읽는다.
# kafkacat -b localhost:9092 -t new_topic -C
message 0001
% Reached end of topic new_topic [0] at offset 4
message 0002
% Reached end of topic new_topic [2] at offset 7
message 0003
% Reached end of topic new_topic [0] at offset 5
message 0004
% Reached end of topic new_topic [1] at offset 6
message 0005
% Reached end of topic new_topic [1] at offset 7
몇 번 파티션으로 부터 메시지를 읽었는지와 각 파티션에서의 offset 을 확인 할 수 있다. 토픽 new_topic에 대한 메타정보를 확인해 보자.
package main
import (
"fmt"
"log"
"os"
"os/signal"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
brokers := []string{"localhost:9092", "localhost:9093", "localhost:9094"}
topics := []string{"message"}
consumer, err := cluster.NewConsumer(brokers, "testgroup", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume messages, watch errors and notifications
for {
select {
case msg, more := <-consumer.Messages():
if more {
fmt.Fprintf(os.Stdout, "msg [%d] [%d]> %s", msg.Partition, msg.Offset, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
case err, more := <-consumer.Errors():
if more {
log.Printf("Error: %s\n", err.Error())
}
case ntf, more := <-consumer.Notifications():
if more {
log.Printf("Rebalanced: %+v\n", ntf)
}
case <-signals:
return
}
}
}
프로듀셔를 실행 한 다음, 컨슈머를 실행해보자.
# go run consumer_cluster.go
2019/06/07 00:53:12 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 00:53:15 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1 2 3]] Released:map[] Current:map[message:[0 1 2 3]]}
4개의 파티션 바라보고 있는 걸 확인 할 수 있다. 컨슈머를 하나 더 실행하면 파티션에 대한 리밸런싱이 일어나는 걸 확인 할 수 있다.
# go run consumer_cluster.go
2019/06/07 00:53:12 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 00:53:15 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1 2 3]] Released:map[] Current:map[message:[0 1 2 3]]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance OK Claimed:map[message:[]] Released:map[message:[0 1]] Current:map[message:[2 3]]}
# go run consumer_cluster.go
2019/06/07 01:13:14 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1]] Released:map[] Current:map[message:[0 1]]}
첫 번째 컨슈머는 2,3 두 번째 컨슈머는 0,1 파티션을 분산해서 처리하는 걸 확인 할 수 있다. 이렇게 파티션을 구성하는 것으로 메시지를 분산처리 할 수 있다. 아래 테스트 동영상이다.
컨슈머가 추가될 때마다 파티션에 대한 리밸런싱이 이루어지는 걸 확인 할 수 있다. 현재 애플리케이션 실행 구조는 아래와 같이 묘사 할 수 있다.
정리
Docker : VirtualBox를 이용했던 시절에 비해서 테스트가 너무 쉬워졌다. Kafka를 멀티노드로 테스트하려면 최소한 5개 정도의 인스턴스는 필요했는데, 지금은 docker-compose up 명령 하나로 kafka와 zookeeper 풀 셋을 전개 할 수 있다.
Zookeeper : 분산코디네이터. 브로커를 모니터링하고, 토픽, 파티션을 관리한다.
kafka : 스트림 프로세싱 영역에서는 산업표준이라고 할 수 있다. 프로젝트에 도입하고 싶은데, 아직은 마땅한 프로젝트가 없는게 아쉽다.
ksql : Kafka를 위한 streaming SQL 이라고 한다. 현재 ksql 서버도 설치되 있으니 ksql-cli 만 설치하면 테스트해 볼 수 있을 것 같다.
Contents
Docker 로 Kafka 테스트 환경 만들기
- Zookeeper 에 대한 기본 이해
- Kafka에 대한 기본 이해
- 컨슈머와 프로듀서
- 파티션
- 컨슈머 그룹
AWS MSK 를 테스트 중이다. MSK의 경우 가용 영역 갯수 만큼의 브로커를 만들어야 한다. 테스트하려는 도쿄리전은 3개의 가용 영역으로 구성돼 있으니 최소 사양으로 만들 경우 (kafka실행을 위한 최소 스펙)kafka.m5.large 인스턴스 3개를 실행해야 하므로 상당한 비용이 들어간다. AWS MSK에서 테스트할 kafka 버전은 2.x인데, 2.x는 사용해본적이 없어서 시간이 꽤나 걸릴 것 같은(비용도 그만큼 올라갈 것 같은) 불길한 예감이 들었다. 그래서 로컬 PC에서 테스트를 충분히 한 다음, AWS에서 테스트를 해보기로 했다. 예전 같으면 VirtualBox로 몇 개의 VM을 만들어서 테스트했겠지만 도커가 있는 요즘에는 그런 노가다를 뛸 필요가 없다. 도커를 이용해서 만들어보기로 했다. kafka 클러스터를 실행하려면 zookeeper도 함께 실행해야 하므로 docker-compose설정이 있는지를 찾아봤다. kafka stack docker compose를 선택했다. 다양한 조합의 docker compose yml 파일들이 있는데, 나는 full-stack.yml 과 zk-multiple-kafka-multiple.yml 두개의 파일을 이용해서 full stack multiple zk & kafka 시스템으로 구성했다. multiple kafka & zookeeper에 kafka & zookeeper 관리 툴들 까지 함께 올려서 완전한 테스트,디버깅 환경을 만들었다고 보면 되겠다. Kafka 환경 분석을 위해서 yml 파일의 내용을 출력해봤다.version: '2.1' # 주키퍼와 카프카 서비스를 설정한다 services: # 3개의 노드로 된 주키퍼 클러스터를 구성한다. zoo1: image: zookeeper:3.4.9 hostname: zoo1 # 클라이언트 포트 2181를 bind 한다. ports: - "2181:2181" # 주키퍼는 서버는 2개의 포트를 가진다. # 2888은 서버 노드끼리 통신을 하기 위해서 사용한다. # 3888은 리더 선출을 위해서 사용한다. environment: ZOO_MY_ID: 1 ZOO_PORT: 2181 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 volumes: - ./zk-multiple-kafka-multiple/zoo1/data:/data - ./zk-multiple-kafka-multiple/zoo1/datalog:/datalog zoo2: image: zookeeper:3.4.9 hostname: zoo2 ports: - "2182:2182" environment: ZOO_MY_ID: 2 ZOO_PORT: 2182 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 volumes: - ./zk-multiple-kafka-multiple/zoo2/data:/data - ./zk-multiple-kafka-multiple/zoo2/datalog:/datalog zoo3: image: zookeeper:3.4.9 hostname: zoo3 ports: - "2183:2183" environment: ZOO_MY_ID: 3 ZOO_PORT: 2183 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 volumes: - ./zk-multiple-kafka-multiple/zoo3/data:/data - ./zk-multiple-kafka-multiple/zoo3/datalog:/datalog # 3개의 카프카 브로커를 구성한다. # Advertised listeners와 주키퍼 노드 포트를 등록한다. # 4개의 파티션을 구성한다. kafka1: image: confluentinc/cp-kafka:5.2.1 hostname: kafka1 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" volumes: - ./zk-multiple-kafka-multiple/kafka1/data:/var/lib/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka2: image: confluentinc/cp-kafka:5.2.1 hostname: kafka2 ports: - "9093:9093" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 2 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" volumes: - ./zk-multiple-kafka-multiple/kafka2/data:/var/lib/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka3: image: confluentinc/cp-kafka:5.2.1 hostname: kafka3 ports: - "9094:9094" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 3 KAFKA_NUM_PARTITIONS: 4 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" volumes: - ./zk-multiple-kafka-multiple/kafka3/data:/var/lib/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka-schema-registry: image: confluentinc/cp-schema-registry:5.2.1 hostname: kafka-schema-registry ports: - "8081:8081" environment: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092 SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 depends_on: - zoo1 - zoo2 - zoo3 - kafka1 - kafka2 - kafka3 schema-registry-ui: image: landoop/schema-registry-ui:0.9.4 hostname: kafka-schema-registry-ui ports: - "8001:8000" environment: SCHEMAREGISTRY_URL: http://kafka-schema-registry:8081/ PROXY: "true" depends_on: - kafka-schema-registry kafka-rest-proxy: image: confluentinc/cp-kafka-rest:5.2.1 hostname: kafka-rest-proxy ports: - "8082:8082" environment: # KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181 KAFKA_REST_LISTENERS: http://0.0.0.0:8082/ KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/ KAFKA_REST_HOST_NAME: kafka-rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092 depends_on: - zoo1 - zoo2 - zoo3 - kafka1 - kafka2 - kafka3 - kafka-schema-registry kafka-topics-ui: image: landoop/kafka-topics-ui:0.9.4 hostname: kafka-topics-ui ports: - "8000:8000" environment: KAFKA_REST_PROXY_URL: "http://kafka-rest-proxy:8082/" PROXY: "true" depends_on: - zoo1 - zoo2 - zoo3 - kafka1 - kafka2 - kafka3 - kafka-schema-registry - kafka-rest-proxy kafka-connect: image: confluentinc/cp-kafka-connect:5.2.1 hostname: kafka-connect ports: - kafka-schema-registry - kafka-rest-proxy kafka-connect: image: confluentinc/cp-kafka-connect:5.2.1 hostname: kafka-connect ports: - kafka-schema-registry - kafka-rest-proxy kafka-connect: image: confluentinc/cp-kafka-connect:5.2.1 hostname: kafka-connect ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092" CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081' CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081' CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars' volumes: - ./connectors:/etc/kafka-connect/jars/ depends_on: - zoo1 - zoo2 - zoo3 - kafka1 - kafka2 - kafka3 - kafka-schema-registry - kafka-rest-proxy kafka-connect-ui: image: landoop/kafka-connect-ui:0.9.4 hostname: kafka-connect-ui ports: - "8003:8000" environment: CONNECT_URL: "http://kafka-connect:8083/" PROXY: "true" depends_on: - kafka-connect ksql-server: image: confluentinc/cp-ksql-server:5.2.1 hostname: ksql-server ports: - "8088:8088" environment: KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092 KSQL_LISTENERS: http://0.0.0.0:8088/ KSQL_KSQL_SERVICE_ID: ksql-server_ depends_on: - zoo1 - zoo2 - zoo3 - kafka1 - kafka2 - kafka3 zoonavigator-web: image: elkozmon/zoonavigator-web:0.5.1 ports: - "8004:8000" environment: API_HOST: "zoonavigator-api" API_PORT: 9000 links: - zoonavigator-api depends_on: - zoonavigator-api zoonavigator-api: image: elkozmon/zoonavigator-api:0.5.1 environment: SERVER_HTTP_PORT: 9000 depends_on: - zoo1 - zoo2 - zoo3# docker-compose -f zk-multiple-kafka-multiple.yml up $ docker-compose ps /home/yundream/.local/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version! RequestsDependencyWarning) Name Command State Ports ------------------------------------------------------------------------------------------------------------------------------- kafkafullstack_kafka-connect-ui_1 /run.sh Up 0.0.0.0:8003->8000/tcp kafkafullstack_kafka-connect_1 /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp kafkafullstack_kafka-rest-proxy_1 /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp kafkafullstack_kafka-schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp kafkafullstack_kafka-topics-ui_1 /run.sh Up 0.0.0.0:8000->8000/tcp kafkafullstack_kafka1_1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp kafkafullstack_kafka2_1 /etc/confluent/docker/run Up 9092/tcp, 0.0.0.0:9093->9093/tcp kafkafullstack_kafka3_1 /etc/confluent/docker/run Up 9092/tcp, 0.0.0.0:9094->9094/tcp kafkafullstack_ksql-server_1 /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp kafkafullstack_schema-registry-ui_1 /run.sh Up 0.0.0.0:8001->8000/tcp kafkafullstack_zoo1_1 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp kafkafullstack_zoo2_1 /docker-entrypoint.sh zkSe ... Up 2181/tcp, 0.0.0.0:2182->2182/tcp, 2888/tcp, 3888/tcp kafkafullstack_zoo3_1 /docker-entrypoint.sh zkSe ... Up 2181/tcp, 0.0.0.0:2183->2183/tcp, 2888/tcp, 3888/tcp kafkafullstack_zoonavigator-api_1 ./run.sh Up 9000/tcp kafkafullstack_zoonavigator-web_1 ./run.sh Up 80/tcp, 0.0.0.0:8004->8000/tcp테스트
kafkacat
- -b : 카프카 브로커 주소 목록
- -t : 토픽
- -p : 파티션
- -P : 프로듀서 모드로 실행. 기본 파티션은 0이다.
- -C : 컨슈머 모드로 실행. -P, -C가 생략될 경우 기본 컨슈머 모드로 실행한다.
- -G : 컨슈머 그룹
앞서 만든 kafka의 정보를 확인해 보자. -L 을 이용하면 메타데이터 정보를 확인 할 수 있다.$ kafkacat -b localhost:9092 -L -t new_topic Metadata for new_topic (from broker -1: localhost:9092/bootstrap): 3 brokers: broker 2 at 127.0.0.1:9093 broker 3 at 127.0.0.1:9094 broker 1 at 127.0.0.1:9092 1 topics: topic "new_topic" with 4 partitions: partition 0, leader 1, replicas: 1, isrs: 1 partition 2, leader 3, replicas: 3, isrs: 3 partition 3, leader 1, replicas: 1, isrs: 1 partition 1, leader 2, replicas: 2, isrs: 2kafka topic ui
ZooNavigator
kafka 애플리케이션 테스트
- 프로듀서 : message 토픽으로 표준출력(키보드 입력)을 전송한다.
- 컨슈머 : message 토픽에서 읽은 데이터를 출력한다.
간단한 push 서비스 정도로 볼 수 있겠다.Go
package main import ( "bufio" "fmt" "github.com/Shopify/sarama" "os" ) type Producer struct { ChatProducer sarama.SyncProducer } // 프로듀서를 만든다. // 앞서 만든 브로커 정보들을 설정했다. func NewProducer() *Producer { config := sarama.NewConfig() config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true c, err := sarama.NewSyncProducer([]string{ "localhost:9092", "localhost:9093", "localhost:9094"}, config) if err != nil { panic(err) } return &Producer{ChatProducer: c} } func (p *Producer) Close() error { err := p.ChatProducer.Close() if err != nil { return err } return nil } // message 토픽으로 메시지를 전송한다. func (p *Producer) SendStringData(message string) error { partition, offset, err := p.ChatProducer.SendMessage(&sarama.ProducerMessage{ Topic: "message", Value: sarama.StringEncoder(message), }) if err != nil { return err } fmt.Printf("%d/%d\n", partition, offset) return nil } func main() { p := NewProducer() reader := bufio.NewReader(os.Stdin) for { fmt.Print("> ") message, _ := reader.ReadString('\n') if message == "quit\n" { break } p.SendStringData(message) } }package main import ( "fmt" "log" "os" "os/signal" cluster "github.com/bsm/sarama-cluster" ) func main() { config := cluster.NewConfig() config.Consumer.Return.Errors = true config.Group.Return.Notifications = true brokers := []string{"localhost:9092", "localhost:9093", "localhost:9094"} topics := []string{"message"} consumer, err := cluster.NewConsumer(brokers, "testgroup", topics, config) if err != nil { panic(err) } defer consumer.Close() // trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // consume messages, watch errors and notifications for { select { case msg, more := <-consumer.Messages(): if more { fmt.Fprintf(os.Stdout, "msg [%d] [%d]> %s", msg.Partition, msg.Offset, msg.Value) consumer.MarkOffset(msg, "") // mark message as processed } case err, more := <-consumer.Errors(): if more { log.Printf("Error: %s\n", err.Error()) } case ntf, more := <-consumer.Notifications(): if more { log.Printf("Rebalanced: %+v\n", ntf) } case <-signals: return } } }# go run consumer_cluster.go 2019/06/07 00:53:12 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]} 2019/06/07 00:53:15 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1 2 3]] Released:map[] Current:map[message:[0 1 2 3]]}# go run consumer_cluster.go 2019/06/07 00:53:12 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]} 2019/06/07 00:53:15 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1 2 3]] Released:map[] Current:map[message:[0 1 2 3]]} 2019/06/07 01:13:16 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]} 2019/06/07 01:13:16 Rebalanced: &{Type:rebalance OK Claimed:map[message:[]] Released:map[message:[0 1]] Current:map[message:[2 3]]} # go run consumer_cluster.go 2019/06/07 01:13:14 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]} 2019/06/07 01:13:16 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1]] Released:map[] Current:map[message:[0 1]]}정리
Recent Posts
Archive Posts
Tags