大概问题是,爬虫程序中,我写了一个类,在类中定义了一个方法,这个方法需要传参,当我在main函数中使用进程池给进程分配任务时

若这个方法无须传参,就可以运行,假若传参,这个方法始终没有返回值,怀疑这个方法也没有运行

如下图:若分配的test方法则正常,若分配的是get_singer_url方法且传参则不生效,在网上查看似乎这个问题在python3中不会出现,但是2中有

import requests
from lxml import etree
from multiprocessing import Pool
import time

class Singer_url(object):
    def __init__(self):
        self.headers  = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) Appl\
        eWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36'}
        self.singer_url_list = []

    def get_page_url(self):
        page_url = ['http://music.163.com/discover/artist/cat?id=1001&initial={}'.format(str(i))for i in range(65,91)]
        return page_url

    def get_singer_url(self,singer_url):
        print('test')
        rsp = requests.get(singer_url,headers = self.headers).content
        xml = etree.HTML(rsp)
        for i in xml.xpath('//ul[@class="m-cvrlst m-cvrlst-5 f-cb"]//li/div/a/@href|//li[@class="sml"]/a[1]/@href'):
            self.singer_url_list.append('http://music.163.com{}'.format(str(i)))
            #print(i)

    def test(self,i):
        print('test')

    def main(self):
        pool = Pool(5)
        page_url = self.get_page_url()
        #print(page_url)
        for task in  page_url:
            print(task)
            pool.apply_async(self.test,(task,))
            time.sleep(1)
            #print(self.singer_url_list)
        pool.close()
        pool.join()

if __name__ == '__main__':
    singer = Singer_url()
    singer.main()
multiprocessing.Pool传递一个普通方法(不在class中定义的)时, 能正常工作.
from multiprocessing import Pool

p = Pool(3)
def f(x):
     return x*x

p.map(f, [1,2,3])
但在class中定义的方法使用multiprocessing.Pool会报pickling error错误.
cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

原因:

stackoverflow上的解释:
Pool methods all use a queue.Queue to pass tasks to the worker processes. Everything that goes through the queue.Queue must be pickable. So, multiprocessing can only transfer Python objects to worker processes which can be pickled. Functions are only picklable if they are defined at the top-level of a module, bound methods are not picklable.

pool方法都使用了queue.Queue将task传递给工作进程。multiprocessing必须将数据序列化以在进程间传递。方法只有在模块的顶层时才能被序列化,跟类绑定的方法不能被序列化,就会出现上面的异常。

链接:https://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-multiprocessing-pool-map

解决方法:

  1. 用线程替换进程
  2. 可以使用copy_reg来规避上面的异常.
  3. dill 或pathos.multiprocesssing :use pathos.multiprocesssing, instead of multiprocessing. pathos.multiprocessing is a fork of multiprocessing that uses dill. dill can serialize almost anything in python, so you are able to send a lot more around in parallel.
正确代码1
 # coding: utf8
from multiprocessing.pool import ThreadPool as Pool


class MyTask(object):
    def task(self, x):
        return x*x

    def run(self):
        pool = Pool(3)

        a = [1, 2, 3]
        ret = pool.map(self.task, a)
        print ret


if __name__ == '__main__':
    t = MyTask()
    t.run()
正确代码2:
# coding: utf8
import multiprocessing
import types
import copy_reg


def _pickle_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)


copy_reg.pickle(types.MethodType, _pickle_method)


class MyTask(object):
    def __init__(self):
        self.__result = []

    def task(self, x):
        return x * x

    def result_collector(self, result):
        self.__result.append(result)

    def run(self):
        pool = multiprocessing.Pool(processes=3)

        a = [1, 2, 3]
        ret = pool.map(self.task, a)
        print ret


if __name__ == '__main__':
    t = MyTask()
    t.run()