新聞中心
這里有您想知道的互聯網營銷解決方案
講解一下Redis中的訂閱發(fā)布模式
發(fā)布與訂閱模型在許多編程語言中都有實現,也就是我們經常說的設計模式中的一種–觀察者模式。在一些應用場合,例如發(fā)送方并不是以固定頻率發(fā)送消息,如果接收方頻繁去咨詢發(fā)送方,這種操作無疑是很麻煩并且不友好的。

為什么做訂閱分布?
隨著業(yè)務復雜, 業(yè)務的項目依賴關系增強, 使用消息隊列幫助系統(tǒng)降低耦合度.
-
訂閱分布本身也是一種生產者消費者模式, 訂閱者是消費者, 發(fā)布者是生產者.
-
訂閱發(fā)布模式, 發(fā)布者發(fā)布消息后, 只要有訂閱方, 則多個訂閱方會收到同樣的消息
-
生產者消費者模式, 生產者往隊列里放入消息, 由多個消費者對一條消息進行搶占.
-
訂閱分布模式可以將一些不著急完成的工作放到其他進程或者線程中進行離線處理.
Redis中的訂閱發(fā)布
“
Redis中的訂閱發(fā)布模式, 當沒有訂閱者時, 消息會被直接丟棄(Redis不會持久化保存消息)
”
Redis生產者消費者
生產者使用Redis中的list數據結構進行實現, 將待處理的消息塞入到消息隊列中.
class Producer(object):
def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = "test_key"
self.value = "test_value_{id}"
def produce(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.lpush(self.key, msg)
消費者使用redis中brpop進行實現, brpop會從list頭部消息, 并能夠設置超時等待時間.
class Consumer(object):
def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = "test_key"
def consume(self, timeout=0):
# timeout=0 表示會無線阻塞, 直到獲得消息
while True:
msg = self._conn.brpop(self.key, timeout=timeout)
process(msg)
def process(msg):
print msg
if __name__ == '__main__':
consumer = Consumer()
consumer.consume()
# 輸出結果
('test_key', 'test_value_1')
('test_key', 'test_value_2')
('test_key', 'test_value_3')
('test_key', 'test_value_4')
('test_key', 'test_value_5')
Redis中訂閱發(fā)布
在Redis Pubsub中, 一個頻道(channel)相當于一個消息隊列
class Publisher(object):
def __init__(self, host, port):
self._conn = redis.StrictRedis(host=host, port=port)
self.channel = "test_channel"
self.value = "test_value_{id}"
def pub(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.publish(self.channel, msg)
其中get_message使用了select IO多路復用來檢查socket連接是否是否可讀.
class Subscriber(object):
def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self._pubsub = self._conn.pubsub() # 生成pubsub對象
self.channel = "test_channel"
self._pubsub.subscribe(self.channel)
def sub(self):
while True:
msg = self._pubsub.get_message()
if msg and isinstance(msg.get("data"), basestring):
process(msg.get("data"))
def close(self):
self._pubsub.close()
# 輸出結果
test_value_1
test_value_2
test_value_3
test_value_4
test_value_
Java Jedis踩過的坑
在Jedis中訂閱方處理是采用同步的方式, 看源碼中PubSub模塊的process函數
在do-while循環(huán)中, 會等到當前消息處理完畢才能夠處理下一條消息, 這樣會導致當入隊列消息量過大的時候, redis鏈接被強制關閉.
解決方案: 將整個處理函數改為異步的方式.
網頁標題:講解一下Redis中的訂閱發(fā)布模式
當前鏈接:http://www.dlmjj.cn/article/dpieppi.html


咨詢
建站咨詢
