U e5d~@sdddgZddlZddlZddlZddlZddlZddlZddlZddlZddlm Z ddl m Z ddl m Z m Z ddlmZd Zd Zd Zd ZeZd dZddZGdddeZGdddZddZGdddeZd+ddZddZGdd d eZ Gd!dde!Z"Gd"d#d#e!Z#e#Z$Gd$d%d%e#Z%Gd&d'd'e!Z&Gd(d)d)e&Z'Gd*dde"Z(dS),Pool ThreadPoolN)Empty)util) get_context TimeoutError)waitINITRUNCLOSE TERMINATEcCs tt|SN)listmapargsr,/usr/lib64/python3.8/multiprocessing/pool.pymapstar/srcCstt|d|dS)Nrr)r itertoolsstarmaprrrr starmapstar2src@seZdZddZddZdS)RemoteTracebackcCs ||_dSrtb)selfrrrr__init__:szRemoteTraceback.__init__cCs|jSrrrrrr__str__<szRemoteTraceback.__str__N)__name__ __module__ __qualname__rrrrrrr9src@seZdZddZddZdS)ExceptionWithTracebackcCs0tt|||}d|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontypejoinexcr)rr)rrrrr@s zExceptionWithTraceback.__init__cCst|j|jffSr) rebuild_excr)rrrrr __reduce__Esz!ExceptionWithTraceback.__reduce__N)r r!r"rr+rrrrr#?sr#cCst||_|Sr)r __cause__)r)rrrrr*Hs r*cs0eZdZdZfddZddZddZZS)MaybeEncodingErrorzVWraps possible unpickleable errors, so they can be safely sent through the socket.cs.t||_t||_tt||j|jdSr)reprr)valuesuperr-r)rr)r/ __class__rrrTs  zMaybeEncodingError.__init__cCsd|j|jfS)Nz(Error sending result: '%s'. Reason: '%s')r/r)rrrrrYszMaybeEncodingError.__str__cCsd|jj|fS)Nz<%s: %s>)r2r rrrr__repr__]szMaybeEncodingError.__repr__)r r!r"__doc__rrr3 __classcell__rrr1rr-Ps r-rFc Cs|dk r(t|tr|dks(td||j}|j}t|drR|j|j |dk rb||d}|dks~|r||krz |} Wn(t t fk rt dYqYnX| dkrt dq| \} } } } }zd| | |f}WnHtk r0}z(|r| tk rt||j}d|f}W5d}~XYnXz|| | |fWnRtk r}z2t||d}t d ||| | d|ffW5d}~XYnXd} } }} } }|d7}qft d |dS) NrzMaxtasks {!r} is not valid_writerrz)worker got EOFError or OSError -- exitingzworker got sentinel -- exitingTFz0Possible encoding error while sending result: %szworker exiting after %d tasks) isinstanceintAssertionErrorformatputgethasattrr6close_readerEOFErrorOSErrorrdebug Exception_helper_reraises_exceptionr# __traceback__r-)inqueueoutqueue initializerinitargsZmaxtaskswrap_exceptionr;r<Z completedtaskjobifuncrkwdsresultewrappedrrrworkerasN        $ rScCs|dS)z@Pickle-able helper function for use by _guarded_task_generation.Nr)ZexrrrrDsrDcs2eZdZdZddfdd ZfddZZS) _PoolCachez Class that implements a cache for the Pool class that will notify the pool management threads every time the cache is emptied. The notification is done by the use of a queue that is provided when instantiating the cache. Nnotifiercs||_tj||dSr)rVr0r)rrVrrOr1rrrsz_PoolCache.__init__cs t||s|jddSr)r0 __delitem__rVr;)ritemr1rrrWs z_PoolCache.__delitem__)r r!r"r4rrWr5rrr1rrTsrTc@seZdZdZdZeddZdLddZej e fd d Z d d Z d dZ eddZeddZddZeddZeddZddZddZdifddZdMdd ZdNd!d"ZdOd#d$Zd%d&ZdPd(d)ZdQd*d+Zdiddfd,d-ZdRd.d/ZdSd0d1ZedTd2d3Ze d4d5Z!ed6d7Z"ed8d9Z#ed:d;Z$dd?Z&d@dAZ'dBdCZ(edDdEZ)e dFdGZ*dHdIZ+dJdKZ,dS)UrzS Class which supports an async version of applying functions to arguments. TcOs |j||SrProcess)ctxrrOrrrrZsz Pool.ProcessNrcCsg|_t|_|pt|_|t|_|j|_ t |j d|_ ||_ ||_ ||_|dkrjtphd}|dkrztd|dk rt|std||_z |WnHtk r|jD]}|jdkr|q|jD] }|q؂YnX|}tjtj|j |j|j|j|j|j|j |j!|j |j|j |j"||j fd|_#d|j#_$t%|j#_|j#&tjtj'|j|j(|j!|j|j fd|_)d|j)_$t%|j)_|j)&tjtj*|j!|j+|j fd|_,d|j,_$t%|j,_|j,&t-j.||j/|j|j |j!|j|j |j#|j)|j,|j f dd|_0t%|_dS) NrUrz&Number of processes must be at least 1zinitializer must be a callabletargetrT)rZ exitpriority)1_poolr _stater_ctx _setup_queuesqueue SimpleQueue _taskqueue_change_notifierrT_cache_maxtasksperchild _initializer _initargsos cpu_count ValueErrorcallable TypeError _processes_repopulate_poolrCexitcode terminater(_get_sentinels threadingZThreadr_handle_workersrZ_inqueue _outqueue_wrap_exception_worker_handlerdaemonr start _handle_tasks _quick_put _task_handler_handle_results _quick_get_result_handlerrZFinalize_terminate_pool _terminate)r processesrHrImaxtasksperchildcontextp sentinelsrrrrs              z Pool.__init__cCs>|j|kr:|d|t|dt|dddk r:|jddS)Nz&unclosed running multiprocessing pool )sourcerf)r`ResourceWarninggetattrrfr;)rZ_warnr rrr__del__s  z Pool.__del__c Cs0|j}d|jd|jd|jdt|jd S)N<.z state=z pool_size=>)r2r!r"r`lenr_)rclsrrrr3sz Pool.__repr__cCs|jjg}|jjg}||Sr)rxr?rf)rZtask_queue_sentinelsZself_notifier_sentinelsrrrrts  zPool._get_sentinelscCsdd|DS)NcSsg|]}t|dr|jqS)sentinel)r=r).0rSrrr s z.Pool._get_worker_sentinels..rZworkersrrr_get_worker_sentinelsszPool._get_worker_sentinelscCsPd}ttt|D]6}||}|jdk rtd||d}||=q|S)zCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. FNcleaning up worker %dT)reversedrangerrrrrBr()poolZcleanedrMrSrrr_join_exited_workerss zPool._join_exited_workersc Cs0||j|j|j|j|j|j|j|j|j |j Sr) _repopulate_pool_staticrarZrpr_rwrxrirjrhryrrrrrq.s zPool._repopulate_poolc Csft|t|D]P} ||t|||||| fd} | jdd| _d| _| || t dqdS)zBring the number of pool processes up to the specified number, for use after reaping workers which have exited. r\rZZ PoolWorkerTz added workerN) rrrSnamereplacer{r|appendrrB) r[rZrrrFrGrHrIrrJrMwrrrr7s zPool._repopulate_pool_staticc Cs*t|r&t|||||||||| dS)zEClean up any exited workers and start replacements for them. N)rrr) r[rZrrrFrGrHrIrrJrrr_maintain_poolJs  zPool._maintain_poolcCs4|j|_|j|_|jjj|_|jjj|_ dSr) rardrwrxr6sendr~r?recvrrrrrrbVs   zPool._setup_queuescCs|jtkrtddS)NzPool not running)r`r rmrrrr_check_running\s zPool._check_runningcCs||||S)zT Equivalent of `func(*args, **kwds)`. Pool must be running. ) apply_asyncr<)rrNrrOrrrapply`sz Pool.applycCs|||t|S)zx Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ) _map_asyncrr<rrNiterable chunksizerrrrgszPool.mapcCs|||t|S)z Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). )rrr<rrrrrnsz Pool.starmapcCs|||t|||S)z= Asynchronous version of `starmap()` method. )rrrrNrrcallbackerror_callbackrrr starmap_asyncvs zPool.starmap_asyncc csjz,d}t|D]\}}||||fifVqWn8tk rd}z||dt|fifVW5d}~XYnXdS)zProvides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.rN) enumeraterCrD)rZ result_jobrNrrMxrQrrr_guarded_task_generation~s zPool._guarded_task_generationrcCs||dkr:t|}|j||j|||jf|S|dkrPtd|t |||}t|}|j||jt ||jfdd|DSdS)zP Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. rzChunksize must be 1+, not {0:n}css|]}|D] }|Vq qdSrrrchunkrXrrr szPool.imap..N) r IMapIteratorrer;r_job _set_lengthrmr:r _get_tasksrrrNrrrP task_batchesrrrimaps4z Pool.imapcCs||dkr:t|}|j||j|||jf|S|dkrPtd|t |||}t|}|j||jt ||jfdd|DSdS)zL Like `imap()` method but ordering of results is arbitrary. rzChunksize must be 1+, not {0!r}css|]}|D] }|Vq qdSrrrrrrrsz&Pool.imap_unordered..N) rIMapUnorderedIteratorrer;rrrrmr:rrrrrrrimap_unordereds0zPool.imap_unorderedcCs6|t|||}|j|jd|||fgdf|S)z; Asynchronous version of `apply()` method. rN)r ApplyResultrer;r)rrNrrOrrrPrrrrs zPool.apply_asynccCs|||t|||S)z9 Asynchronous version of `map()` method. )rrrrrr map_asyncszPool.map_asyncc Cs|t|dst|}|dkrJtt|t|jd\}}|rJ|d7}t|dkrZd}t|||}t||t|||d} |j | | j ||df| S)zY Helper function to implement map, starmap and their async counterparts. __len__Nrrr) rr=rdivmodrr_rr MapResultrer;rr) rrNrZmapperrrrZextrarrPrrrrs,  zPool._map_asynccCs"t||d|s|q dS)N)timeout)r emptyr<)rchange_notifierrrrr_wait_for_updatess zPool._wait_for_updatesc Cspt}|jtks |rX|jtkrX|||||||| | | | ||| }|||q|dt ddS)Nzworker handler exiting) rucurrent_threadr`r r rrrr;rrB)rcache taskqueuer[rZrrrFrGrHrIrrJrrthreadZcurrent_sentinelsrrrrvs zPool._handle_workersc Cspt}t|jdD]\}}d}z|D]}|jtkrBtdqz ||Wq&tk r} zB|dd\} } z||  | d| fWnt k rYnXW5d} ~ XYq&Xq&|rtd|r|dnd} || dWqW q W5d}}} Xqtdz6td| dtd |D]} |dq.Wn t k r`td YnXtd dS) Nz'task handler found thread._state != RUNFzdoing set_length()rrztask handler got sentinelz/task handler sending sentinel to result handlerz(task handler sending sentinel to workersz/task handler got OSError when sending sentinelsztask handler exiting) ruriterr<r`r rrBrC_setKeyErrorr;rA) rr;rGrrrZtaskseqZ set_lengthrKrLrQidxrrrrr} sB         zPool._handle_tasksc Cst}z |}Wn$ttfk r6tdYdSX|jtkr`|jtksTt dtdq|dkrttdq|\}}}z|| ||Wnt k rYnXd}}}q|rR|jtkrRz |}Wn$ttfk rtdYdSX|dkrtdq|\}}}z|| ||Wnt k rBYnXd}}}qt |drtdz,t dD]}|jsq|qrWnttfk rYnXtd t||jdS) Nz.result handler got EOFError/OSError -- exitingzThread not in TERMINATEz,result handler found thread._state=TERMINATEzresult handler got sentinelz&result handler ignoring extra sentinelr?z"ensuring that outqueue is not full z7result handler exiting: len(cache)=%s, thread._state=%s)rurrAr@rrBr`r r r9rrr=rr?pollr)rGr<rrrKrLrMobjrrrr:s^               zPool._handle_resultsccs0t|}tt||}|s dS||fVqdSr)rtuplerislice)rNitsizerrrrrvs zPool._get_taskscCs tddS)Nz:pool objects cannot be passed between processes or pickled)NotImplementedErrorrrrrr+szPool.__reduce__cCs2td|jtkr.t|_t|j_|jddS)Nz closing pool)rrBr`r r rzrfr;rrrrr>s   z Pool.closecCstdt|_|dS)Nzterminating pool)rrBr r`rrrrrrss zPool.terminatecCsjtd|jtkrtdn|jttfkr4td|j|j |j |j D] }|qXdS)Nz joining poolzPool is still runningzIn unknown state) rrBr`r rmr r rzr(rrr_)rrrrrr(s       z Pool.joincCs@td|j|r<|jr<|jt dqdS)Nz7removing tasks from inqueue until task handler finishedr) rrBZ_rlockacquireis_aliver?rrtimesleep)rF task_handlerrrrr_help_stuff_finishs    zPool._help_stuff_finishc CsXtdt|_|dt|_td|||t||sXt| dkrXtdt|_|d|dtdt |k r| |rt |ddrtd|D]} | j dkr| qtdt |k r| td t |k r| |rTt |ddrTtd |D](} | r*td | j| q*dS) Nzfinalizing poolz&helping task handler/workers to finishrz.Cannot have cache with result_hander not alivezjoining worker handlerrszterminating workerszjoining task handlerzjoining result handlerzjoining pool workersr)rrBr r`r;rrrr9rurr(r=rrrspid) rrrFrGrrZworker_handlerrZresult_handlerrrrrrrsB               zPool._terminate_poolcCs ||Sr)rrrrr __enter__szPool.__enter__cCs |dSr)rs)rexc_typeZexc_valZexc_tbrrr__exit__sz Pool.__exit__)NNrNN)N)N)NNN)r)r)NNN)NNN)N)-r r!r"r4ry staticmethodrZrwarningswarnr rr3rtrrrqrrrbrrrrrrrrrrrr classmethodrvr}rrr+r>rsr(rrrrrrrrrsx  P                - ;    5c@s@eZdZddZddZddZddd Zdd d Zd d ZdS)rcCs>||_t|_tt|_|j|_||_||_ ||j|j<dSr) r_ruZEvent_eventnext job_counterrrg _callback_error_callback)rrrrrrrrs  zApplyResult.__init__cCs |jSr)rZis_setrrrrreadyszApplyResult.readycCs|std||jS)Nz{0!r} not ready)rrmr:_successrrrr successfulszApplyResult.successfulNcCs|j|dSr)rr rrrrrr szApplyResult.waitcCs,|||st|jr"|jS|jdSr)r rrr_valuerrrrr<s  zApplyResult.getcCsZ|\|_|_|jr$|jr$||j|jr<|js<||j|j|j|j=d|_dSr) rrrrrsetrgrr_rrMrrrrrs       zApplyResult._set)N)N) r r!r"rrrr r<rrrrrrs    rc@seZdZddZddZdS)rcCshtj||||dd|_dg||_||_|dkrNd|_|j|j|j =n||t |||_dS)NrTr) rrrr _chunksize _number_leftrrrgrbool)rrrlengthrrrrrrs    zMapResult.__init__cCs|jd8_|\}}|rv|jrv||j||j|d|j<|jdkr|jrZ||j|j|j=|jd|_ nL|s|jrd|_||_|jdkr|j r| |j|j|j=|jd|_ dS)NrrF) rrrrrrgrrrr_r)rrMZsuccess_resultsuccessrPrrrr$s&          zMapResult._setN)r r!r"rrrrrrrs rc@s:eZdZddZddZd ddZeZdd Zd d ZdS) rcCsT||_tt|_tt|_|j|_t |_ d|_ d|_ i|_||j|j<dS)Nr)r_ruZ ConditionZLock_condrrrrg collectionsdeque_items_index_length _unsorted)rrrrrrBs  zIMapIterator.__init__cCs|Srrrrrr__iter__MszIMapIterator.__iter__Nc Cs|jz|j}Wnztk r|j|jkr>d|_td|j|z|j}Wn2tk r|j|jkrd|_tdt dYnXYnXW5QRX|\}}|r|S|dSr) rrpopleft IndexErrorrrr_ StopIterationr r)rrrXrr/rrrrPs&   zIMapIterator.nextc Cs|j|j|krn|j||jd7_|j|jkrb|j|j}|j||jd7_q,|jn ||j|<|j|jkr|j|j =d|_ W5QRXdSNr) rrrrrpopnotifyrrgrr_rrrrrhs        zIMapIterator._setc CsB|j2||_|j|jkr4|j|j|j=d|_W5QRXdSr)rrrrrgrr_)rrrrrrys    zIMapIterator._set_length)N) r r!r"rrr__next__rrrrrrr@s   rc@seZdZddZdS)rc CsV|jF|j||jd7_|j|j|jkrH|j|j=d|_W5QRXdSr) rrrrrrrgrr_rrrrrs    zIMapUnorderedIterator._setN)r r!r"rrrrrrsrc@sVeZdZdZeddZdddZdd Zd d Zed d Z eddZ ddZ dS)rFcOsddlm}|||S)NrrY)ZdummyrZ)r[rrOrZrrrrZs zThreadPool.ProcessNrcCst||||dSr)rr)rrrHrIrrrrszThreadPool.__init__cCs,t|_t|_|jj|_|jj|_dSr)rcrdrwrxr;r~r<rrrrrrbs   zThreadPool._setup_queuescCs |jjgSr)rfr?rrrrrtszThreadPool._get_sentinelscCsgSrrrrrrrsz ThreadPool._get_worker_sentinelscCsFz|jddqWntjk r(YnXt|D]}|dq2dS)NF)block)r<rcrrr;)rFrrrMrrrrs  zThreadPool._help_stuff_finishcCst|dSr)rr)rrrrrrrrszThreadPool._wait_for_updates)NNr) r r!r"ryrrZrrbrtrrrrrrrrs    )NrNF))__all__rrrkrcrurr%rrr$rrrZ connectionr r r r r countrrrrCrr#r*r-rSrDdictrTobjectrrZ AsyncResultrrrrrrrr sN     -=)+E