I wrote my own parameter search implementation mostly due to the fact that I don't need cross-validation of GridSearch and RandomizedSearch of scikit-learn.I use dask to deliver optimal distributed performance.Here is what I have:from scipy.stats import uniformclass Params(object): def __init__(self,fixed,loc=0.0,scale=1.0): self.fixed=fixed self.sched=uniform(loc=loc,scale=scale) def _getsched(self,i,size): return self.sched.rvs(size=size,random_state=i) def param(self,i,size=None): tmp=self.fixed.copy() if size is None: size=tmp['niter'] tmp.update({'schd':self._getsched(i,size)}) return tmpclass Mymodel(object): def __init__(self,func,params_object,score,ntries,client): self.params=params_object self.func=func self.score=score self.ntries=ntries self.client=client def _run(self,params,train,test): return self.func(params,train,test,self.score) def build(self,train,test): res=[] for i in range(self.ntries): cparam=self.params.param(i) res.append( (cparam, self.client.submit(self._run, cparam, train,test)) ) self._results=res return res def compute_optimal(self,res=None): from operator import itemgetter if res is None: res=self._results self._sorted=sorted(self.client.compute(res),key=itemgetter(1)) return self._sorted[0]def score(test,correct): return np.linalg.norm(test-correct)def myfunc(params,ldata,data,score): schd=params['schd'] niter=len(schd) #here I do some magic after which ldata is changing return score(test=ldata,correct=data)After I start dask.distributed:from distributed import Clientscheduler_host='myhostname:8786'cli=Client(scheduler_host)I run it like this:%%timeparams=Params({'niter':50},loc=1.0e-06,scale=1.0)model=Mymodel(myfunc,params,score,100,cli)ptdata=bad_data_example.copy()graph=model.build(ptdata,good_data)And get this:distributed.protocol.pickle - INFO - Failed to serialize<bound method Mymodel._run of <__main__.Mymodel object at 0x2b8961903050>>.Exception: can't pickle thread.lock objectsCould you please help me to understand what is going on and how to fix this?I'm also curious about the way how I find the minimum within all the parameters results. Is there a better way to do it with Dask?I wrote this code fairly fast and never tried it in serial.I'm learning Dask together with many other topics (machine learning, gpu programming, Numba, Python OOP and etc.) so this code is not optimal by any means...P.S. To actually execute it I use this call: model.compute_optimal(). Haven't got here yet - due to the error above. 解决方案 It looks like the main issue was due to the fact that I tried to map a method of a function. I had similar issues with joblib as well. So I re-coded the problem and removed all the classes.The following issues regarding optimization are posted here: Parameter search using daskI'll definetely use dask-searchcv in my work - when I'll need cross-validation - but for now it's really only a simple search for an optimal solution - so had to create my own implementation... 这篇关于使用Dask分布式的自定义参数搜索类期间的thread.lock的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
08-28 07:08