使用消息队列在数据的通信中拥有很多优点,snakemq是一个开源的用python实现的跨平台mq库,well,python的消息队列包snakemq使用初探,here we go:
一、关于snakemq的官方介绍snakemq的github项目页:https://github.com/dsiroky/snakemq1.纯python实现,跨平台
2.自动重连接
3.可靠发送–可配置的消息方式与消息超时方式
4.持久化/临时 两种队列
5.支持异步 — poll()
6.symmetrical — 单个tcp连接可用于双工通讯
7.多数据库支持 — sqlite、mongodb……
8.brokerless – 类似zeromq的实现原理
9.扩展模块:rpc, bandwidth throttling
以上都是官话,需要自己验证,动手封装了一下,感觉萌萌哒。
二、几个主要问题说明
1.支持自动重连,不需要自己动手写心跳逻辑,你只需要关注发送和接收就行
2.支持数据持久化,如果开始持久化,在重连之后会自动发送数据。
3.数据的接收,snakemq通过提供回调实现,你只需要写个接收方法添加到回调列表里去。
4.数据的发送,在此发送的都是bytes类型(二进制),因此需要转换。我在程序中测试的都是文本字符串,使用str.encode(‘utf-8′)转换成bytes,接收时再转换回来。
5.术语解释,connector:类似于socket的tcpclient,lisenter:类似于socket的tcpserver,每个connector或者listener都一个一个ident标识,发送和接收数据时就知道是谁的数据了。
6.使用sqlite持久化时,需要修改源码,sqlite3.connect(filename,check_same_thread = false),用于解决多线程访问sqlite的问题。(会不会死锁?)
7.启动持久化时,如果重新连上,则会自动发送,保证可靠。
8.为了封装的需要,数据接收以后,我通过callback方式传送出去。
三、代码
说明代码中使用了自定义的日志模块
from common import nxlogger
import snakemqlogger as logger
可替换成logging的。
回调类(callbacks.py):
# -*- coding:utf-8 -*-
”’synchronized callback”’
class callback(object):
def __init__(self):
self.callbacks = []
def add(self, func):
self.callbacks.append(func)
def remove(self, func):
self.callbacks.remove(func)
def __call__(self, *args, **kwargs):
for callback in self.callbacks:
callback(*args, **kwargs)
connector类(snakemqconnector.py):
# -*- coding:utf-8 -*-
import threading
import snakemq
import snakemq.link
import snakemq.packeter
import snakemq.messaging
import snakemq.message
from snakemq.storage.sqlite import sqlitequeuesstorage
from snakemq.message import flag_persistent
from common.callbacks import callback
from common import nxlogger
import snakemqlogger as logger
class snakemqconnector(threading.thread):
def __init__(self, snakemqident = none, remoteip = “localhost”, remoteport = 9090, persistent = false):
super(snakemqconnector,self).__init__()
self.messaging = none
self.link = none
self.snakemqident = snakemqident
self.pktr = none
self.remoteip = remoteip
self.remoteport = remoteport
self.persistent = persistent
self.on_recv = callback()
self._initconnector()
def run(self):
logger.info(“connector start…”)
if self.link != none:
self.link.loop()
logger.info(“connector end…”)
def terminate(self):
logger.info(“connetor terminating…”)
if self.link != none:
self.link.stop()
self.link.cleanup()
logger.info(“connetor terminated”)
def on_recv_message(self, conn, ident, message):
try:
self.on_recv(ident, message.data.decode(‘utf-8′))#dispatch received data
except exception as e:
logger.error(“connector recv:{0}”.format(e))
print(e)
”’send message to dest host named destident”’
def sendmsg(self, destident, byteseq):
msg = none
if self.persistent:
msg = snakemq.message.message(byteseq, ttl=60, flags=flag_persistent)
else:
msg = snakemq.message.message(byteseq, ttl=60)
if self.messaging == none:
logger.error(“connector:messaging is not initialized, send message failed”)
return
self.messaging.send_message(destident, msg)
”’
”’
def _initconnector(self):
try:
self.link = snakemq.link.link()
self.link.add_connector((self.remoteip, self.remoteport))
self.pktr = snakemq.packeter.packeter(self.link)
if self.persistent:
storage = sqlitequeuesstorage(“snakemqstorage.db”)
self.messaging = snakemq.messaging.messaging(self.snakemqident, “”, self.pktr, storage)
else:
self.messaging = snakemq.messaging.messaging(self.snakemqident, “”, self.pktr)
self.messaging.on_message_recv.add(self.on_recv_message)
except exception as e:
logger.error(“connector:{0}”.format(e))
finally:
logger.info(“connector[{0}] loop ended…”.format(self.snakemqident))
listener类(snakemqlistener.py):
# -*- coding:utf-8 -*-
import threading
import snakemq
import snakemq.link
import snakemq.packeter
import snakemq.messaging
import snakemq.message
from common import nxlogger
import snakemqlogger as logger
from common.callbacks import callback
class snakemqlistener(threading.thread):
def __init__(self, snakemqident = none, ip = “localhost”, port = 9090, persistent = false):
super(snakemqlistener,self).__init__()
self.messaging = none
self.link = none
self.pktr = none
self.snakemqident = snakemqident
self.ip = ip;
self.port = port
self.connectors = {}
self.on_recv = callback()
self.persistent = persistent
self._initlistener()
”’
thread run
”’
def run(self):
logger.info(“listener start…”)
if self.link != none:
self.link.loop()
logger.info(“listener end…”)
”’
terminate snakemq listener thread
”’
def terminate(self):
logger.info(“listener terminating…”)
if self.link != none:
self.link.stop()
self.link.cleanup()
logger.info(“listener terminated”)
”’
receive message from host named ident
”’
def on_recv_message(self, conn, ident, message):
try:
self.on_recv(ident, message.data.decode(‘utf-8’))#dispatch received data
self.sendmsg(‘bob’,’hello,{0}’.format(ident).encode(‘utf-8′))
except exception as e:
logger.error(“listener recv:{0}”.format(e))
print(e)
def on_drop_message(self, ident, message):
print(“message dropped”, ident, message)
logger.debug(“listener:message dropped,ident:{0},message:{1}”.format(ident, message))
”’client connect”’
def on_connect(self, ident):
logger.debug(“listener:{0} connected”.format(ident))
self.connectors[ident] = ident
self.sendmsg(ident, “hello”.encode(‘utf-8′))
”’client disconnect”’
def on_disconnect(self, ident):
logger.debug(“listener:{0} disconnected”.format(ident))
if ident in self.connectors:
self.connectors.pop(ident)
”’
listen start loop
”’
def _initlistener(self):
try:
self.link = snakemq.link.link()
self.link.add_listener((self.ip, self.port))
self.pktr = snakemq.packeter.packeter(self.link)
self.pktr.on_connect.add(self.on_connect)
self.pktr.on_disconnect.add(self.on_disconnect)
if self.persistent:
storage = sqlitequeuesstorage(“snakemqstorage.db”)
self.messaging = snakemq.messaging.messaging(self.snakemqident, “”, self.pktr, storage)
else:
self.messaging = snakemq.messaging.messaging(self.snakemqident, “”, self.pktr)
self.messaging.on_message_recv.add(self.on_recv_message)
self.messaging.on_message_drop.add(self.on_drop_message)
except exception as e:
logger.error(“listener:{0}”.format(e))
finally:
logger.info(“listener:loop ended…”)
”’send message to dest host named destident”’
def sendmsg(self, destident, byteseq):
msg = none
if self.persistent:
msg = snakemq.message.message(byteseq, ttl=60, flags=flag_persistent)
else:
msg = snakemq.message.message(byteseq, ttl=60)
if self.messaging == none:
logger.error(“listener:messaging is not initialized, send message failed”)
return
self.messaging.send_message(destident, msg)
测试代码connector(testsnakeconnector.py):
读取本地一个1m的文件,然后发送给listener,然后listener发回一个hello的信息。
from netcomm.snakemq import snakemqconnector
import time
import sys
import os
def received(ident, data):
print(data)
if __name__ == “__main__”:
bob = snakemqconnector.snakemqconnector(‘bob’,”10.16.5.45″,4002,true)
bob.on_recv.add(received)
bob.start()
try:
with open(“testfile.txt”,encoding=’utf-8′) as f:
txt = f.read()
for i in range(100):
bob.sendmsg(“niess”,txt.encode(‘utf-8’))
time.sleep(0.1)
except exception as e:
print(e)
time.sleep(5)
bob.terminate()
测试代码listener(testsnakelistener.py):
from netcomm.snakemq import snakemqlistener
import time
def received(ident, data):
filename = “log/recfile{0}.txt”.format(time.strftime(‘%s’,time.localtime()))
file = open(filename,’w’)
file.writelines(data)
file.close()
if __name__ == “__main__”:
niess = snakemqlistener.snakemqlistener(“niess”,”10.16.5.45″,4002)
niess.on_recv.add(received)
niess.start()
print(“niess start…”)
time.sleep(60)
niess.terminate()
print(“niess end…”)
更多python的消息队列包snakemq使用相关文章请关注php中文网!