dssim并行加速尝试-1

  1. sort的时候使用拓扑排序确定不同proxy的执行顺序,但是其中有部分是可以并行的,对sort的结果进行修改,变成二维列表。内层列表中的proxy可以并行。
    #before 
    sorted_proxies = List[proxy]
    proxy_sets =  Dict[str, List[proxy]] 
    
    #after
    sorted_proxies = List[ List[proxy] ] 
    proxy_sets =  Dict[str, List[List[proxy]]] 
    
  2. 使用multiprocessing.pool作为进程池,每次将一个内层列表中的proxy.func放入进程池,然后等待这一个列表中的任务都完成,再切换下一个列表。
  3. 整个求结过程是不停循环步长,每个步长中调用所有proxy的func,所以希望进程池不会在完成几个proxy.func的小任务后销毁,所有步长中都用这一个进程池,否则进程池的创建和销毁花费巨大。
  4. 这样就需要控制好同步的问题,一个内层list内部都要执行完了,才能开启下一个list。师兄说java有个countdownlatch可以控制同步,找了一下python中的pool.apply_async()的返回值res,有一个res.wait()方法,在主进程中调用就可以阻塞直到子进程结束。
    for x in range(1000):
        for i,group in enumerate(workers):
            for j,worker in enumerate(group):
                results = []
                result = p.apply_async(worker.do_something,
                                args= (value+i,q,i,j))
                results.append(result)
            [result.wait() for result in results]
    
            while True:
                if q.qsize() == 0:
                    break
                i,j,res = q.get()
                workers[i][j] = res
    
  5. 通信问题
    1. 子进程和主进程的内存是分开的,在子进程中对object做的修改不能影响主进程中object,可以使用multiprocessing.Manager.Queue通信
    class Worker:
     def __init__(self, i):
         self.id = i
         self.value = i
     def do_something(self, value, q,i,j):
         self.value = value
         time.sleep(2)
         print(os.getpid(),"id:",self.id, "do_something... self.value is ",self.value)
         q.put((i,j,self))
    
    1. 这个简单的例子中可以直接将子进程修改后的object对象put进queue,然后在子进程中对相应的object再赋值,结果6s(单进程10s)
    2. 但是这样要把位置参数i,j和队列queue传进去,接口都要修改。而且dssim中solve、output、updata都会更新proxy,而且solve还会调用output,接口什么的都要修改。

本文章使用limfx的vscode插件快速发布