本文和大家分享的主要是使用python
实现分布式任务相关内容,一起来看看吧,希望对大家
学习python有所帮助。
深入读了读python
的官方文档,发觉
Python
自带的
multiprocessing
模块有很多预制的接口可以方便的实现多个主机之间的通讯,进而实现典型的生产者
-
消费者模式的分布式任务架构。
之前,为了在Python
中实现生产者
-
消费者模式,往往就会选择一个额外的队列系统,比如
rabbitMQ
之类。此外,你有可能还要设计一套任务对象的序列化方式以便塞入队列。如果没有队列的支持,那不排除有些同学不得不从
socket
服务器做起,直接跟
TCP/IP
打起交道来。
其实multiprocessing.managers
中有个
BaseManager
就为开发者提供了这样一个快速接口。
我们假定的场景是1
个生产者(
producer.py
)
+8
个消费者
(worker.py)
的系统,还有一个中央节点负责协调(
server.py
)实现如下:
server.py
from multiprocessing.managers
import BaseManager
import Queue
queue = Queue.Queue() #
初始化一个
Q
,用于消息传递
class
QueueManager(BaseManager):
pass
QueueManager.register('get_queue', callable=
lambda:queue) #
在系统中发布
get_queue
这个业务
if __name__ == '__main__':
m = QueueManager(address=('10.239.85.193', 50000),authkey='abr' )
#
监听所有
10.239.85.193
的
50000
口
s = m.get_server()
s.serve_forever()
worker.py
from multiprocessing.managers
import BaseManager
from multiprocessing
import Pool
class
QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
def
feb(i): #
经典的
'
山羊增殖
'
if i < 2:
return 1
if i < 5 :
return feb(i-1) + feb(i-2)
return feb(i-1) + feb(i-2) - feb(i-5)
def
worker(i):
m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')#
连接
server
m.connect()
while
True:
queue = m.get_queue()#
获取
Q
c = queue.get()
print feb(c)
if __name__ == '__main__':
p = Pool(8) #
分进程启动
8
个
worker
p.map(worker, range(8))
producer.py
from multiprocessing.managers
import BaseManager
class
QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
if __name__ == '__main__':
m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')
m.connect()
i = 0
while
True:
queue = m.get_queue()
queue.put(48)
i+=1
系统会直接将Queue()
对象中的数据直接封装后通过
TCP 50000
端口在主机之间传递。不过需要注意的是,由于
authkey
的缘故,各个节点要求
python
的版本一致。
来源:
开源小站