REDIS는 다양한 데이터 타입을 지원하는 메시지 큐로 알려져 있다. 그런 REDIS가 Pub/Sub를 지원한다. Pub/Sub는 메시지큐와는 특성이 다르다. Pub/Sub 시스템에서는 채널에 구독 신청을 한 모든 subscriber에게 메시지를 전달한다. 메시지를 "던지는" 시스템이기 때문에, 메시지를 보관하지도 않는다. 메시지 큐 본연의 목적과는 좀 다른 기능이라고 할 수 있겠다. 어쨋든 REDIS에서 제공하는 기능이고, 최근 효율적인 PUB/SUB 구축에 대한 요구도 있고해서 살펴보기로 했다. REDIS에 대한 기본적인 지식(설치, 간단한 운용, 지원하는 데이터 타입에 대한 이해)은 가지고 있다고 가정한다.
SUBSCRIBE 명령을 이용해서 채널을 구독할 수 있다. 매개변수로 채널 이름이 들어간다. 하나 이상의 채널에 대한 구독도 가능하다. test 채널과 qa 채널에 대해서 구독 신청을 했다.
> SUBSCRIBE test qa
PUBLISH 명령을 이용해서 채널에 메시지를 발행할 수 있다.
> PUBLISH test
Push 메시지 데이터 형식
Push 메시지는 3개의 요소들로 구성된 "배열값"을 전송한다.
Push 메시지 타입 : 어떤 종류의 메시지인지 알려준다. "subscribe", "message", "unsubscribe" 3가지 타입의 메시지가 있다.
# encoding: utf-8
require 'sinatra-websocket'
require 'redis'
require 'json'
class MyApp < Sinatra::Application
# 클라이언트(웹 브라우저)의 웹 소캣 객체를 저장한다.
$ws_a = Hash.new
Thread.new do
redis = Redis.new(:host=>'192.168.56.5')
redis.subscribe("message") do | on |
# 읽은 메시지는 모든 클라이언트 웹 소캣에 그대로 쓴다.
on.message do |channel, msg|
$ws_a.each do | name, ws|
puts "Send Data #{msg}"
ws.send(msg)
end
end
end
end
# localhost:3000/?name=yundream 형식으로 요청한다.
# name 파라메터는 클라이언트를 식별하기 위해서 사용하고 있다.
# 세션을 만들어야 겠지만 귀찮아서.(모든게 귀찮음 으로 해결된다.)
get "/" do
@name = params[:name]
erb :index
end
get "/message" do
@name = params[:name]
puts "User is #{@name}"
if !request.websocket?
erb :error
else
request.websocket do |ws|
# 웹 소켓이 연결되면 소켓 목록에 웹 소켓을
# 저장한다.
ws.onopen do
$ws_a[@name] = ws
puts "Connection OK #{@name}"
end
ws.onmessage do |msg|
end
ws.onclose do
if $ws_a.has_key? @name
puts "Remove websocket #{@name}"
$ws_a.delete @name
end
end
end
end
end
end
index.erb
아래는 Index.erb에 사용한 코드다. 주석으로 대신한다. 이왕 만드는 거 foundation으로 예쁘게 만들어 보고 싶었으나.. 귀찮아서 UI는 포기
<!doctype html>
<!--[if IE 9]><html class="lt-ie10" lang="en" > <![endif]-->
<!--[if IE 10]><html class="ie10" lang="en" > <![endif]-->
<html class="no-js" lang="en" data-useragent="Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)">
<head>
<link rel="stylesheet" href="/css/foundation.css" />
<link rel="stylesheet" href="/foundation-icons/foundation-icons.css" />
<script src="/js/vendor/modernizr.js"></script>
<script src="/js/vendor/jquery.js"></script>
<script src="/js/foundation/foundation.js"></script>
<script src="/js/foundation/foundation.reveal.js"></script>
<script src="/js/foundation/foundation.dropdown.js"></script>
<meta http-equiv="Content-Type" content="text/html;charset=UTF-8" />
<script>
// 웹 소켓으로 부터 읽은 데이터를 출력한다.
function addItem(obj) {
$("#container").append(
'<div class="large-12 columns">'+obj.topic+':'+obj.msg+'</div>')
}
// 웹 소켓으로 부터 데이터를 읽는다.
function message() {
var ws = new WebSocket('ws://' + window.location.host + '/message?name='+'yundream');
ws.onopen = function() {};
ws.onclose = function() {}
ws.onmessage = function(m) {
var obj = JSON.parse(m.data)
addItem(obj);
};
}
$(document).ready(function() {
message();
});
</script>
</head>
<body>
<!-- 데이터는 여기에 출력한다. -->
<div class="row" id="container">
</div>
</body>
테스트
웹 브라우저로 "localshot:3000/?name=username"형식으로 접근하면 된다.
PUB 클라이언트 만들기 귀찮아서 그냥 redis-cli로 테스트 했다.
# redis-cli -h 192.168.56.5
> PUBLISH message "{\"topic\":\"news\",\"msg\":\"This is web socket test\"}"
정리
MQTT와의 비교
MQTT도 비슷한 용도로 사용할 수 있는데(사실 MQTT의 전문분야라고 봐야 겠지만) 기능 보다는 사용환경에 있어서 차이가 있다. MQTT는 저전력, 신뢰할 수 없는 네트워크 환경에서의 작동을 목표로 하고 있다. 또한 QoS를 이용해서, 환경에 따라서 적절하게 서비스 품질을 조절할 수 있는 장점이 있다.
나는 IoT환경에서의 메시지 교환을 목적으로 REDIS를 테스트 했다. IoT 환경을 기준으로 내 생각을 정리 한다.
모바일 기기 혹은 IoT기기와의 통신에는 MQTT를 사용한다.
인프라 내부 컴포넌트들간의 메시지 교환에는 REDIS를 이용한다. 메시지 인프라에는 다양한 성격의 컴포넌트들이 존재한다. 이런 환경에서는 다양한 데이터 타입을 제공하는 REDIS가 강점을 가질 수 있다. 메시지 함, PUB/SUB 시스템, 연결 테이블 관리, 라우팅 테이블 관리등을 단일 소프트웨어로 구현할 수 있는 것은 큰 장점이다.
하고픈 것들
확장 가능한 REDIS PUB/SUB 시스템 설계. REDIS 클러스터 구현 쪽을 살펴봐야 겠다.
메시지 함의 구현
프로토 타이핑 프로그램의 개선. 지금은 딱 작동하는 수준으로 만들었다. UI도 그렇고.. GitHub에 올린다음 개선 작업을 해야 겠다.
Contents
Pub/Sub
Push 메시지 데이터 형식
- Push 메시지 타입 : 어떤 종류의 메시지인지 알려준다. "subscribe", "message", "unsubscribe" 3가지 타입의 메시지가 있다.
- subscribe : 채널을 성공적으로 subscribe 했다.
- message : 채널로 부터 전송된 일반 메시지
- unsubscribe : 채널을 성공적으로 unsubscribe 했다.
- Channel name : Subscribe한 채널 이름
- Message : 전송된 메시지
최초 Subscribe 했을 때의 메시지다.Database & Scope
패턴매칭 subscribe
프로그래밍
- 언어 : Ruby 2.1.2p95
- 운영체제 : 우분투 리눅스 14.10
redis gem을 설치하자PUB 프로그램
# cat pub.rb require 'redis' require 'json' class Publisher @name = nil @channel = nil @redis = nil def initialize args @name =args[:name] @channel = args[:channel] @redis = Redis.new(:host=>"192.168.57.2") end def run data = {"user"=>@name} loop do print "> " msg = STDIN.gets @redis.publish @channel, data.merge("msg"=>msg.strip).to_json end end end name = ARGV[0] channel = ARGV[1] puts "Name : #{name}" puts "channel : #{channel}" pub = Publisher.new({:name=>name, :channel=>channel}) pub.run# redis-cli -h 192.168.57.2 > MONITOR 1415955481.013326 [0 192.168.57.1:37574] "publish" "chatting" "{\"user\":\"yundream\",\"msg\":\"Hello world\"}"SUB 프로그램
require 'rubygems' require 'redis' require 'json' class Subscriber @channel = nil @redis = nil def initialize args @channel = args[:channel] @redis = Redis.new(:host=>"192.168.57.2") end def run @redis.subscribe(@channel) do |on| on.message do |channel, msg| data = JSON.parse(msg) puts "##{channel} #{data['user']} : #{data['msg']}" end end end end channel = ARGV[0] puts "channel : #{channel}" sub = Subscriber.new({:channel=>channel}) sub.run응용
구성
- Sinatra는 Sub Client를 별도의 쓰레드로 운용한다. REDIS Server로 부터 SUB한 정보를 웹 소켓에 쓴다. Sinatra는 웹 소켓 목록을 유지하고 있어야 한다.
- Web browser와 Sinatra와는 websocket로 연결한다.
- PUB Client 프로그램으로 REDIS PUB Server에 메시지를 publish 한다.
- SUB Client는 REDIS Server로 부터 메시지를 읽고, 연결한 웹 소켓에 쓴다.
제대로 구현하려면 회색부분도 추가해야 할 거다. 유저가 연결하지 않은 상태에서 전달된 메시지는 메시지함(REDIS Msg Queue)에 적재 하는등의 컴포넌트가 필요하지만, 완전히 작동하는 웹 애플리케이션을 만드는게 목적은 아니니 구현은 하지 않기로 한다.개발 범위
- 유저는 구독할 뉴스 토픽을 선택
- 웹 애플리케이션 서버는 유저가 구독한 토픽에 대해서만 PUB 해야 할 것이다.
하지만 모든 토픽을 클라이언트에게 전달할 것이다. 구현하기 귀찮다.메시지 포멧
{ "topic": "토픽 이름", "msg": "메시지" }웹 애플리케이션 서버
# encoding: utf-8 require 'sinatra-websocket' require 'redis' require 'json' class MyApp < Sinatra::Application # 클라이언트(웹 브라우저)의 웹 소캣 객체를 저장한다. $ws_a = Hash.new Thread.new do redis = Redis.new(:host=>'192.168.56.5') redis.subscribe("message") do | on | # 읽은 메시지는 모든 클라이언트 웹 소캣에 그대로 쓴다. on.message do |channel, msg| $ws_a.each do | name, ws| puts "Send Data #{msg}" ws.send(msg) end end end end # localhost:3000/?name=yundream 형식으로 요청한다. # name 파라메터는 클라이언트를 식별하기 위해서 사용하고 있다. # 세션을 만들어야 겠지만 귀찮아서.(모든게 귀찮음 으로 해결된다.) get "/" do @name = params[:name] erb :index end get "/message" do @name = params[:name] puts "User is #{@name}" if !request.websocket? erb :error else request.websocket do |ws| # 웹 소켓이 연결되면 소켓 목록에 웹 소켓을 # 저장한다. ws.onopen do $ws_a[@name] = ws puts "Connection OK #{@name}" end ws.onmessage do |msg| end ws.onclose do if $ws_a.has_key? @name puts "Remove websocket #{@name}" $ws_a.delete @name end end end end end endindex.erb
<!doctype html> <!--[if IE 9]><html class="lt-ie10" lang="en" > <![endif]--> <!--[if IE 10]><html class="ie10" lang="en" > <![endif]--> <html class="no-js" lang="en" data-useragent="Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)"> <head> <link rel="stylesheet" href="/css/foundation.css" /> <link rel="stylesheet" href="/foundation-icons/foundation-icons.css" /> <script src="/js/vendor/modernizr.js"></script> <script src="/js/vendor/jquery.js"></script> <script src="/js/foundation/foundation.js"></script> <script src="/js/foundation/foundation.reveal.js"></script> <script src="/js/foundation/foundation.dropdown.js"></script> <meta http-equiv="Content-Type" content="text/html;charset=UTF-8" /> <script> // 웹 소켓으로 부터 읽은 데이터를 출력한다. function addItem(obj) { $("#container").append( '<div class="large-12 columns">'+obj.topic+':'+obj.msg+'</div>') } // 웹 소켓으로 부터 데이터를 읽는다. function message() { var ws = new WebSocket('ws://' + window.location.host + '/message?name='+'yundream'); ws.onopen = function() {}; ws.onclose = function() {} ws.onmessage = function(m) { var obj = JSON.parse(m.data) addItem(obj); }; } $(document).ready(function() { message(); }); </script> </head> <body> <!-- 데이터는 여기에 출력한다. --> <div class="row" id="container"> </div> </body>테스트
# redis-cli -h 192.168.56.5 > PUBLISH message "{\"topic\":\"news\",\"msg\":\"This is web socket test\"}"정리
MQTT와의 비교
하고픈 것들
참고
Recent Posts
Archive Posts
Tags