a DOg+@s`ddgZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl m Z ddl m Z m Z ddlmZdZd Zd Zd ZeZd d ZddZGdddeZGdddZddZGdddeZd*ddZddZGdddeZ Gd dde!Z"Gd!d"d"e!Z#e#Z$Gd#d$d$e#Z%Gd%d&d&e!Z&Gd'd(d(e&Z'Gd)dde"Z(dS)+Pool ThreadPoolN)util) get_context TimeoutError)waitINITRUNCLOSE TERMINATEcCs tt|SN)listmapargsr9/opt/alt/python39/lib64/python3.9/multiprocessing/pool.pymapstar/srcCstt|d|dS)Nrr)r itertoolsstarmaprrrr starmapstar2src@seZdZddZddZdS)RemoteTracebackcCs ||_dSr tb)selfrrrr__init__:szRemoteTraceback.__init__cCs|jSr rrrrr__str__<szRemoteTraceback.__str__N)__name__ __module__ __qualname__rrrrrrr9src@seZdZddZddZdS)ExceptionWithTracebackcCs0tt|||}d|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontypejoinexcr)rr(rrrrr@s zExceptionWithTraceback.__init__cCst|j|jffSr ) rebuild_excr(rrrrr __reduce__Esz!ExceptionWithTraceback.__reduce__N)rr r!rr*rrrrr"?sr"cCst||_|Sr )r __cause__)r(rrrrr)Hs r)cs0eZdZdZfddZddZddZZS)MaybeEncodingErrorzVWraps possible unpickleable errors, so they can be safely sent through the socket.cs.t||_t||_tt||j|jdSr )reprr(valuesuperr,r)rr(r. __class__rrrTs  zMaybeEncodingError.__init__cCsd|j|jfS)Nz(Error sending result: '%s'. Reason: '%s')r.r(rrrrrYszMaybeEncodingError.__str__cCsd|jj|fS)Nz<%s: %s>)r1rrrrr__repr__]szMaybeEncodingError.__repr__)rr r!__doc__rrr2 __classcell__rrr0rr,Ps r,rFc Cs|dur(t|tr|dks(td||j}|j}t|drR|j|j |durb||d}|dus~|r||krz |} Wn&t t fyt dYqYn0| durt dq| \} } } } }zd| | i|f}WnJty4}z0|r| turt||j}d|f}WYd}~n d}~00z|| | |fWnTty}z:t||d}t d ||| | d|ffWYd}~n d}~00d} } }} } }|d7}qft d |dS) NrzMaxtasks {!r} is not valid_writerrz)worker got EOFError or OSError -- exitingzworker got sentinel -- exitingTFz0Possible encoding error while sending result: %szworker exiting after %d tasks) isinstanceintAssertionErrorformatputgethasattrr5close_readerEOFErrorOSErrorrdebug Exception_helper_reraises_exceptionr" __traceback__r,)inqueueoutqueue initializerinitargsZmaxtaskswrap_exceptionr:r;Z completedtaskjobifuncrkwdsresultewrappedrrrworkerasN        ( rRcCs|dS)z@Pickle-able helper function for use by _guarded_task_generation.Nr)ZexrrrrCsrCcs2eZdZdZddfdd ZfddZZS) _PoolCachez Class that implements a cache for the Pool class that will notify the pool management threads every time the cache is emptied. The notification is done by the use of a queue that is provided when instantiating the cache. Nnotifiercs||_tj|i|dSr )rUr/r)rrUrrNr0rrrsz_PoolCache.__init__cs t||s|jddSr )r/ __delitem__rUr:)ritemr0rrrVs z_PoolCache.__delitem__)rr r!r3rrVr4rrr0rrSsrSc@seZdZdZdZeddZdLddZej e fd d Z d d Z d dZ eddZeddZddZeddZeddZddZddZdifddZdMdd ZdNd!d"ZdOd#d$Zd%d&ZdPd(d)ZdQd*d+Zdiddfd,d-ZdRd.d/ZdSd0d1ZedTd2d3Ze d4d5Z!ed6d7Z"ed8d9Z#ed:d;Z$dd?Z&d@dAZ'dBdCZ(edDdEZ)e dFdGZ*dHdIZ+dJdKZ,dS)UrzS Class which supports an async version of applying functions to arguments. TcOs|j|i|Sr Process)ctxrrNrrrrYsz Pool.ProcessNrcCsg|_t|_|pt|_|t|_|j|_ t |j d|_ ||_ ||_ ||_|durjtphd}|dkrztd|durt|std||_z |WnFty|jD]}|jdur|q|jD] }|qւYn0|}tjtj|j |j|j|j|j|j|j |j!|j |j|j |j"||j fd|_#d|j#_$t%|j#_|j#&tjtj'|j|j(|j!|j|j fd|_)d|j)_$t%|j)_|j)&tjtj*|j!|j+|j fd|_,d|j,_$t%|j,_|j,&t-j.||j/|j|j |j!|j|j |j#|j)|j,|j f dd|_0t%|_dS) NrTrz&Number of processes must be at least 1zinitializer must be a callabletargetrT)rZ exitpriority)1_poolr _stater_ctx _setup_queuesqueue SimpleQueue _taskqueue_change_notifierrS_cache_maxtasksperchild _initializer _initargsos cpu_count ValueErrorcallable TypeError _processes_repopulate_poolrBexitcode terminater'_get_sentinels threadingZThreadr_handle_workersrY_inqueue _outqueue_wrap_exception_worker_handlerdaemonr start _handle_tasks _quick_put _task_handler_handle_results _quick_get_result_handlerrZFinalize_terminate_pool _terminate)r processesrGrHmaxtasksperchildcontextp sentinelsrrrrs~                  z Pool.__init__cCs>|j|kr:|d|t|dt|dddur:|jddS)Nz&unclosed running multiprocessing pool )sourcere)r_ResourceWarninggetattrrer:)rZ_warnr rrr__del__s   z Pool.__del__c Cs0|j}d|jd|jd|jdt|jd S)N<.z state=z pool_size=>)r1r r!r_lenr^)rclsrrrr2s z Pool.__repr__cCs |jjg}|jjg}g||Sr )rwr>re)rZtask_queue_sentinelsZself_notifier_sentinelsrrrrss  zPool._get_sentinelscCsdd|DS)NcSsg|]}t|dr|jqS)sentinel)r<r).0rRrrr s z.Pool._get_worker_sentinels..rZworkersrrr_get_worker_sentinelsszPool._get_worker_sentinelscCsPd}ttt|D]6}||}|jdurtd||d}||=q|S)zCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. FNcleaning up worker %dT)reversedrangerrqrrAr')poolZcleanedrLrRrrr_join_exited_workerss zPool._join_exited_workersc Cs0||j|j|j|j|j|j|j|j|j |j Sr ) _repopulate_pool_staticr`rYror^rvrwrhrirgrxrrrrrp.s zPool._repopulate_poolc Csft|t|D]P} ||t|||||| fd} | jdd| _d| _| || t dqdS)zBring the number of pool processes up to the specified number, for use after reaping workers which have exited. r[rYZ PoolWorkerTz added workerN) rrrRnamereplacerzr{appendrrA) rZrYrrrErFrGrHrrIrLwrrrr7s zPool._repopulate_pool_staticc Cs*t|r&t|||||||||| dS)zEClean up any exited workers and start replacements for them. N)rrr) rZrYrrrErFrGrHrrIrrr_maintain_poolJs   zPool._maintain_poolcCs4|j|_|j|_|jjj|_|jjj|_ dSr ) r`rcrvrwr5sendr}r>recvrrrrrraVs   zPool._setup_queuescCs|jtkrtddS)NzPool not running)r_r rlrrrr_check_running\s zPool._check_runningcCs||||S)zT Equivalent of `func(*args, **kwds)`. Pool must be running. ) apply_asyncr;)rrMrrNrrrapply`sz Pool.applycCs|||t|S)zx Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ) _map_asyncrr;rrMiterable chunksizerrrrgszPool.mapcCs|||t|S)z Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). )rrr;rrrrrnsz Pool.starmapcCs|||t|||S)z= Asynchronous version of `starmap()` method. )rrrrMrrcallbackerror_callbackrrr starmap_asyncvs zPool.starmap_asyncc cslz,d}t|D]\}}||||fifVqWn:tyf}z"||dt|fifVWYd}~n d}~00dS)zProvides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.rN) enumeraterBrC)rZ result_jobrMrrLxrPrrr_guarded_task_generation~s zPool._guarded_task_generationrcCs||dkr:t|}|j||j|||jf|S|dkrPtd|t |||}t|}|j||jt ||jfdd|DSdS)zP Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. rzChunksize must be 1+, not {0:n}css|]}|D] }|Vq qdSr rrchunkrWrrr zPool.imap..N) r IMapIteratorrdr:r_job _set_lengthrlr9r _get_tasksrrrMrrrO task_batchesrrrimaps4z Pool.imapcCs||dkr:t|}|j||j|||jf|S|dkrPtd|t |||}t|}|j||jt ||jfdd|DSdS)zL Like `imap()` method but ordering of results is arbitrary. rzChunksize must be 1+, not {0!r}css|]}|D] }|Vq qdSr rrrrrrrz&Pool.imap_unordered..N) rIMapUnorderedIteratorrdr:rrrrlr9rrrrrrrimap_unordereds0zPool.imap_unorderedcCs6|t|||}|j|jd|||fgdf|S)z; Asynchronous version of `apply()` method. rN)r ApplyResultrdr:r)rrMrrNrrrOrrrrs zPool.apply_asynccCs|||t|||S)z9 Asynchronous version of `map()` method. )rrrrrr map_asyncszPool.map_asyncc Cs|t|dst|}|durJtt|t|jd\}}|rJ|d7}t|dkrZd}t|||}t||t|||d} |j | | j ||df| S)zY Helper function to implement map, starmap and their async counterparts. __len__Nrrr) rr<rdivmodrr^rr MapResultrdr:rr) rrMrZmapperrrrZextrarrOrrrrs,  zPool._map_asynccCs"t||d|s|q dS)N)timeout)remptyr;)rchange_notifierrrrr_wait_for_updatess zPool._wait_for_updatesc Cstt}|jtks |r\|jtkr\|||||||| | | | g||| }|||q|dt ddS)Nzworker handler exiting) rtcurrent_threadr_r r rrrr:rrA)rcache taskqueuerZrYrrrErFrGrHrrIrrthreadZcurrent_sentinelsrrrrus zPool._handle_workersc Cst}t|jdD] \}}d}z|D]}|jtkrDtdqz ||Wq(ty} zH|dd\} } z||  | d| fWnt yYn0WYd} ~ q(d} ~ 00q(|rtd|r|dnd} || dWd}}} qWd}}} q,Wd}}} qd}}} 0qtdz6td| dtd |D]} |dqPWnt ytd Yn0td dS) Nz'task handler found thread._state != RUNFzdoing set_length()rrztask handler got sentinelz/task handler sending sentinel to result handlerz(task handler sending sentinel to workersz/task handler got OSError when sending sentinelsztask handler exiting) rtriterr;r_r rrArB_setKeyErrorr:r@) rr:rFrrrZtaskseqZ set_lengthrJrPrKidxrrrrr| sJ            zPool._handle_tasksc Cst}z |}Wn"ttfy4tdYdS0|jtkrLtdq|dur`tdq|\}}}z||||Wnt yYn0d}}}q|r6|jt kr6z |}Wn"ttfytdYdS0|durtdq|\}}}z||||Wnt y&Yn0d}}}qt |drtdz,t dD]}|j slqv|qVWnttfyYn0tdt||jdS) Nz.result handler got EOFError/OSError -- exitingz,result handler found thread._state=TERMINATEzresult handler got sentinelz&result handler ignoring extra sentinelr>z"ensuring that outqueue is not full z7result handler exiting: len(cache)=%s, thread._state=%s)rtrr@r?rrAr_r rrr r<rr>pollr)rFr;rrrJrKrLobjrrrr:sZ                zPool._handle_resultsccs0t|}tt||}|s dS||fVqdSr )rtuplerislice)rMitsizerrrrrvs zPool._get_taskscCs tddS)Nz:pool objects cannot be passed between processes or pickled)NotImplementedErrorrrrrr*szPool.__reduce__cCs2td|jtkr.t|_t|j_|jddS)Nz closing pool)rrAr_r r ryrer:rrrrr=s   z Pool.closecCstdt|_|dS)Nzterminating pool)rrAr r_rrrrrrrs zPool.terminatecCsjtd|jtkrtdn|jttfvr4td|j|j |j |j D] }|qXdS)Nz joining poolzPool is still runningzIn unknown state) rrAr_r rlr r ryr'r~rr^)rrrrrr's       z Pool.joincCs@td|j|r<|jr<|jt dqdS)Nz7removing tasks from inqueue until task handler finishedr) rrAZ_rlockacquireis_aliver>rrtimesleep)rE task_handlerrrrr_help_stuff_finishs    zPool._help_stuff_finishc CsXtdt|_|dt|_td|||t||sXt| dkrXtdt|_|d|dtdt |ur| |rt |ddrtd|D]} | j dur| qtdt |ur| td t |ur| |rTt |ddrTtd |D](} | r*td | j| q*dS) Nzfinalizing poolz&helping task handler/workers to finishrz.Cannot have cache with result_hander not alivezjoining worker handlerrrzterminating workerszjoining task handlerzjoining result handlerzjoining pool workersr)rrAr r_r:rrrr8rtrr'r<rqrrpid) rrrErFrrZworker_handlerrZresult_handlerrrrrrrsB               zPool._terminate_poolcCs ||Sr )rrrrr __enter__szPool.__enter__cCs |dSr )rr)rexc_typeZexc_valZexc_tbrrr__exit__sz Pool.__exit__)NNrNN)N)N)NNN)r)r)NNN)NNN)N)-rr r!r3rx staticmethodrYrwarningswarnr rr2rsrrrprrrarrrrrrrrrrrr classmethodrur|rrr*r=rrr'rrrrrrrrrsv  P                - ;    5c@sJeZdZddZddZddZddd Zdd d Zd d Ze e j Z dS)rcCs>||_t|_tt|_|j|_||_||_ ||j|j<dSr ) r^rtZEvent_eventnext job_counterrrf _callback_error_callback)rrrrrrrrs  zApplyResult.__init__cCs |jSr )rZis_setrrrrreadyszApplyResult.readycCs|std||jS)Nz{0!r} not ready)rrlr9_successrrrr successfulszApplyResult.successfulNcCs|j|dSr )rrrrrrrrszApplyResult.waitcCs,|||st|jr"|jS|jdSr )rrrr_valuerrrrr;s  zApplyResult.getcCsZ|\|_|_|jr$|jr$||j|jr<|js<||j|j|j|j=d|_dSr ) rrrrrsetrfrr^rrLrrrrrs       zApplyResult._set)N)N) rr r!rrrrr;rrtypes GenericAlias__class_getitem__rrrrrs    rc@seZdZddZddZdS)rcCshtj||||dd|_dg||_||_|dkrNd|_|j|j|j =n||t |||_dS)NrTr) rrrr _chunksize _number_leftrrrfrbool)rrrlengthrrrrrrs    zMapResult.__init__cCs|jd8_|\}}|rv|jrv||j||j|d|j<|jdkr|jrZ||j|j|j=|jd|_ nL|s|jrd|_||_|jdkr|j r| |j|j|j=|jd|_ dS)NrrF) rrrrrrfrrrr^r)rrLZsuccess_resultsuccessrOrrrr&s&          zMapResult._setN)rr r!rrrrrrrs rc@s:eZdZddZddZd ddZeZdd Zd d ZdS) rcCsT||_tt|_tt|_|j|_t |_ d|_ d|_ i|_||j|j<dS)Nr)r^rtZ ConditionZLock_condrrrrf collectionsdeque_items_index_length _unsorted)rrrrrrDs  zIMapIterator.__init__cCs|Sr rrrrr__iter__OszIMapIterator.__iter__Nc Cs|jz|j}Wnvty|j|jkrrrrrrsszThreadPool._get_sentinelscCsgSr rrrrrrsz ThreadPool._get_worker_sentinelscCsDz|jddqWntjy&Yn0t|D]}|dq0dS)NF)block)r;rbZEmptyrr:)rErrrLrrrrs  zThreadPool._help_stuff_finishcCst|dSr )rr)rrrrrrrrszThreadPool._wait_for_updates)NNr) rr r!rxrrYrrarsrrrrrrrrs    )NrNF))__all__rrrjrbrtrr$rrr#rrrZ connectionrr r r r countrrrrBrr"r)r,rRrCdictrSobjectrrZ AsyncResultrrrrrrrr sN    -=++E