Python多线程(线程池)与多进程(进程池)简单使用

目前Python多线程不能利用CPU多核优势,IO密集型可用多线程,CPU密集型适合用多进程

首先定义一个用来并行跑的函数,加一个随机sleep时间好感受并行的结果,两个参数好理解并行时多参数怎么传递。

import random
def Test(a, b):
    time.sleep(random.randint(5, 20))
    print(str(a) + '_' + str(b) + '\t')

线程池

import random
import threadpool
def MultiThreadTest():
    pool = threadpool.ThreadPool(20)
    li = []
    for i in range(1000):
        li.append((None, {'a': i, 'b': i + 10}))
    requests = threadpool.makeRequests(Test, li)
    [pool.putRequest(req) for req in requests]
    pool.wait()

threadpool需要安装,pip就可以 多参数用 (None, {....}),当前版本threadpool 1.3.2是这么写的。

进程池

import multiprocessing
def MultiProcessTest():
    pool = multiprocessing.Pool(processes = 4)
    for i in range(1000):
        pool.apply_async(Test, (i, i + 10, ))
    pool.close()
    pool.join()

先close后join。

共享数据

另外,多线程可以用Python的Queue共享数据,多进程要用multiprocessing.Queue。

这里尝试用multiprocessing的dict保存数据。

import multiprocessing
def Test(a, b, mpDict):
    print(str(a) + "test", b)
    mpDict[str(a) + "test"] = b
def MultiProcessTest():
    pool = multiprocessing.Pool(processes=4)
    mpDict = multiprocessing.Manager().dict()
    for i in range(5):
        pool.apply_async(Test, (i, i + 10, mpDict, ))
    pool.close()
    pool.join()
    traditionDict = dict(mpDict)
    print(traditionDict)

生产者-消费者 模型

Pool 共享 Queue 有个 multiprocessing.Queue() 只支持 Process 出来的进程,不支持 Pool 的,在 Pool 中需要使用 multiprocessing.Manager()

下面代码为 1 个生产者和 4 个消费者的例子。

# 生产者
def write(q):
    a = np.random.randint(0, 100, (100, 2, 2))
    for value in range(a.shape[0]):
        print('Put %s to queue...\n' % a[value])
        q.put(a[value])
        print(q.qsize())


# 消费者:
def read(q):
    while True:
        # get的参数是 block=True, timeout=None
        # block表示队列空时是阻塞等待还是抛出异常
        # timeout指等待一定时间抛出异常,还是无限等待。
        value = q.get(True)
        print('Get %s from queue.\n' % value)
        print(q.qsize())
        time.sleep(random.random())


def test_pool():
    manager = mp.Manager()
    q = manager.Queue(2)
    pw = Process(target=write, args=(q,))
    pw.start()
    worker_num = 4
    pool = mp.Pool(processes=worker_num)
    for i in range(worker_num):
        print('start data worker ' + str(i))
        pool.apply_async(read, (q, ))
    pool.close()
    pw.join()
    pool.join()