python自定义进程池实例分析【生产者、消费者模型问题】

本文实例分析了python自定义进程池。分享给大家供大家参考,具体如下:

代码说明一切:

#encoding=utf-8
#author: walker
#date: 2014-05-21
#function: 自定义进程池遍历目录下文件
from multiprocessing import process, queue, lock
import time, os
#消费者
class consumer(process):
def __init__(self, queue, iolock):
super(consumer, self).__init__()
self.queue = queue
self.iolock = iolock
def run(self):
while true:
task = self.queue.get() #队列中无任务时,会阻塞进程
if isinstance(task, str) and task == ‘quit’:
break;
time.sleep(1) #假定任务处理需要1秒钟
self.iolock.acquire()
print( str(os.getpid()) + ‘ ‘ + task)
self.iolock.release()
self.iolock.acquire()
print ‘bye-bye’
self.iolock.release()
#生产者
def producer():
queue = queue() #这个队列是进程/线程安全的
iolock = lock()
subnum = 4 #子进程数量
workers = build_worker_pool(queue, iolock, subnum)
start_time = time.time()
for parent, dirnames, filenames in os.walk(r’d:\test’):
for filename in filenames:
queue.put(filename)
iolock.acquire()
print(‘qsize:’ + str(queue.qsize()))
iolock.release()
while queue.qsize() > subnum * 10: #控制队列中任务数量
time.sleep(1)
for worker in workers:
queue.put(‘quit’)
for worker in workers:
worker.join()
iolock.acquire()
print(‘done! time taken: {}’.format(time.time() – start_time))
iolock.release()
#创建进程池
def build_worker_pool(queue, iolock, size):
workers = []
for _ in range(size):
worker = consumer(queue, iolock)
worker.start()
workers.append(worker)
return workers
if __name__ == ‘__main__’:
producer()

ps:

self.iolock.acquire()

self.iolock.release()

可用:

with self.iolock:

替代。

再来一个好玩的例子:

#encoding=utf-8
#author: walker
#date: 2016-01-06
#function: 一个多进程的好玩例子
import os, sys, time
from multiprocessing import pool
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
g_list = [‘a’]
#修改全局变量g_list
def modifydict_1():
global g_list
g_list.append(‘b’)
#修改全局变量g_list
def modifydict_2():
global g_list
g_list.append(‘c’)
#处理一个
def procone(num):
print(‘procone ‘ + str(num) + ‘, g_list:’ + repr(g_list))
#处理所有
def procall():
pool = pool(processes = 4)
for i in range(1, 20):
#procone(i)
#pool.apply(procone, (i,))
pool.apply_async(procone, (i,))
pool.close()
pool.join()
modifydict_1() #修改全局变量g_list
if __name__ == ‘__main__’:
modifydict_2() #修改全局变量g_list
print(‘in main g_list :’ + repr(g_list))
procall()

windows7 下运行的结果:

λ python3 demo.py
in main g_list :[‘a’, ‘b’, ‘c’]
procone 1, g_list:[‘a’, ‘b’]
procone 2, g_list:[‘a’, ‘b’]
procone 3, g_list:[‘a’, ‘b’]
procone 4, g_list:[‘a’, ‘b’]
procone 5, g_list:[‘a’, ‘b’]
procone 6, g_list:[‘a’, ‘b’]
procone 7, g_list:[‘a’, ‘b’]
procone 8, g_list:[‘a’, ‘b’]
procone 9, g_list:[‘a’, ‘b’]
procone 10, g_list:[‘a’, ‘b’]
procone 11, g_list:[‘a’, ‘b’]
procone 12, g_list:[‘a’, ‘b’]
procone 13, g_list:[‘a’, ‘b’]
procone 14, g_list:[‘a’, ‘b’]
procone 15, g_list:[‘a’, ‘b’]
procone 16, g_list:[‘a’, ‘b’]
procone 17, g_list:[‘a’, ‘b’]
procone 18, g_list:[‘a’, ‘b’]
procone 19, g_list:[‘a’, ‘b’]

ubuntu 14.04下运行的结果:

in main g_list :[‘a’, ‘b’, ‘c’]
procone 1, g_list:[‘a’, ‘b’, ‘c’]
procone 2, g_list:[‘a’, ‘b’, ‘c’]
procone 3, g_list:[‘a’, ‘b’, ‘c’]
procone 5, g_list:[‘a’, ‘b’, ‘c’]
procone 4, g_list:[‘a’, ‘b’, ‘c’]
procone 8, g_list:[‘a’, ‘b’, ‘c’]
procone 9, g_list:[‘a’, ‘b’, ‘c’]
procone 7, g_list:[‘a’, ‘b’, ‘c’]
procone 11, g_list:[‘a’, ‘b’, ‘c’]
procone 6, g_list:[‘a’, ‘b’, ‘c’]
procone 12, g_list:[‘a’, ‘b’, ‘c’]
procone 13, g_list:[‘a’, ‘b’, ‘c’]
procone 10, g_list:[‘a’, ‘b’, ‘c’]
procone 14, g_list:[‘a’, ‘b’, ‘c’]
procone 15, g_list:[‘a’, ‘b’, ‘c’]
procone 16, g_list:[‘a’, ‘b’, ‘c’]
procone 17, g_list:[‘a’, ‘b’, ‘c’]
procone 18, g_list:[‘a’, ‘b’, ‘c’]
procone 19, g_list:[‘a’, ‘b’, ‘c’]

可以看见windows7下第二次修改没有成功,而ubuntu下修改成功了。据uliweb作者limodou讲,原因是windows下是充重启实现的子进程;linux下是fork实现的。

更多python自定义进程池实例分析【生产者、消费者模型问题】相关文章请关注php中文网!

Posted in 未分类

发表评论