python自定义主从分布式架构实例分析

这篇文章主要介绍了python自定义主从分布式架构,结合实例形式分析了主从分布式架构的结构、原理与具体的代码实现技巧,需要的朋友可以参考下

本文实例讲述了python自定义主从分布式架构。分享给大家供大家参考,具体如下:

环境:win7 x64,python 2.7,apscheduler 2.1.2。

原理图如下:

代码部分:

(1)、中心节点:

#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 中心节点(主要功能是分配任务)
import socketserver, socket, queue
centerip = ‘127.0.0.1’ #中心节点ip
centerlistenport = 9999 #中心节点监听端口
centerclient = socket.socket(socket.af_inet, socket.sock_dgram) #中心节点用于发送网络消息的socket
taskqueue = queue.queue() #任务队列
#获取任务队列
def gettaskqueue():
for i in range(1, 11):
taskqueue.put(str(i))
#centerserver的回调函数,在接受到udp报文是触发
class myudphandler(socketserver.baserequesthandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print(data)
if data.startswith(‘wait’):
vec = data.split(‘:’)
if len(vec) != 3:
print(‘error: len(vec) != 3’)
else:
nodeip = vec[1]
nodelistenport = vec[2]
nodeid = nodeip + ‘:’ + nodelistenport
if not taskqueue.empty():
task = taskqueue.get()
print(‘send task ‘ + task + ‘ to ‘ + nodeid)
centerclient.sendto(‘task:’ + task, (nodeip, int(nodelistenport)))
else:
print(‘taskqueue is empty!’)
gettaskqueue() #获取任务队列
centerserver = socketserver.udpserver((centerip, centerlistenport), myudphandler)
print(‘listen port ‘ + str(centerlistenport) + ‘ …’)
centerserver.serve_forever()

(2)、任务节点:

#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 任务节点(请求/接收/执行任务)
import time, socket, socketserver
from apscheduler.scheduler import scheduler
centerip = ‘127.0.0.1’ #中心节点ip
centerlistenport = 9999 #中心节点监听端口
nodeip = socket.gethostbyname(socket.gethostname()) #任务节点自身ip
nodeclient = socket.socket(socket.af_inet, socket.sock_dgram) #任务节点用于发送网络消息的socket
#任务:发送网络信息
def jobsendnetmsg():
msg = ”
if nodeserver.taskstate == ‘wait’:
msg = ‘wait:’ + nodeip + ‘:’ + str(nodelistenport)
elif nodeserver.taskstate == ‘exec’:
msg = ‘exec:’ + nodeip + ‘:’ + str(nodelistenport)
print(msg)
nodeclient.sendto(msg, (centerip, centerlistenport))
#添加并启动定时任务
def inittimer():
sched = scheduler()
sched.add_interval_job(jobsendnetmsg, seconds=1)
sched.start()
#执行任务
def exectask(task):
print(‘exectask ‘ + task + ‘ …’)
time.sleep(2)
print(‘exectask ‘ + task + ‘ over’)
#nodeserver的回调函数,在接受到udp报文是触发
class myudphandler(socketserver.baserequesthandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print(‘recv data: ‘ + data)
if data.startswith(‘task’):
vec = data.split(‘:’)
if len(vec) != 2:
print(‘error: len(vec) != 2’)
else:
task = vec[1]
self.server.taskstate = ‘exec’
exectask(task)
self.server.taskstate = ‘wait’
inittimer()
nodeserver = socketserver.udpserver((”, 0), myudphandler)
nodeserver.taskstate = ‘wait’ #(exec/wait)
nodelistenport = nodeserver.server_address[1]
print(‘nodelistenport:’ + str(nodelistenport))
nodeserver.serve_forever()

更多python自定义主从分布式架构实例分析相关文章请关注php中文网!

Posted in 未分类

发表评论