o ?Og@sdddgZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl m Z ddl m Z m Z ddlmZdZd Zd Zd ZeZd d ZddZGdddeZGdddZddZGdddeZ  d*ddZddZGdddeZ Gd dde!Z"Gd!d"d"e!Z#e#Z$Gd#d$d$e#Z%Gd%d&d&e!Z&Gd'd(d(e&Z'Gd)dde"Z(dS)+Pool ThreadPoolN)util) get_context TimeoutError)waitINITRUNCLOSE TERMINATEcCs tt|SN)listmapargsr;/opt/alt/python310/lib64/python3.10/multiprocessing/pool.pymapstar/ rcCstt|d|dS)Nrr)r itertoolsstarmaprrrr starmapstar2src@eZdZddZddZdS)RemoteTracebackcCs ||_dSr tb)selfrrrr__init__: zRemoteTraceback.__init__cCs|jSr rrrrr__str__<szRemoteTraceback.__str__N)__name__ __module__ __qualname__rr!rrrrr9s rc@r)ExceptionWithTracebackcCs0tt|||}d|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontypejoinexcr)rr+rrrrr@s zExceptionWithTraceback.__init__cCst|j|jffSr ) rebuild_excr+rr rrr __reduce__Ez!ExceptionWithTraceback.__reduce__N)r"r#r$rr-rrrrr%?s r%cCst||_|Sr )r __cause__)r+rrrrr,Hs r,cs0eZdZdZfddZddZddZZS)MaybeEncodingErrorzVWraps possible unpickleable errors, so they can be safely sent through the socket.cs.t||_t||_tt||j|jdSr )reprr+valuesuperr0r)rr+r2 __class__rrrTs  zMaybeEncodingError.__init__cCsd|j|jfS)Nz(Error sending result: '%s'. Reason: '%s')r2r+r rrrr!YszMaybeEncodingError.__str__cCsd|jj|fS)Nz<%s: %s>)r5r"r rrr__repr__]r.zMaybeEncodingError.__repr__)r"r#r$__doc__rr!r6 __classcell__rrr4rr0Ps  r0rFc Cs|durt|tr |dkstd||j}|j}t|dr)|j|j |dur1||d}|dus=|r||krz|} Wnt t fyRt dYnw| dur]t dn| \} } } } }z d| | i|f}Wn"ty}z|r| turt||j}d|f}WYd}~nd}~wwz || | |fWn)ty}zt||d}t d ||| | d|ffWYd}~nd}~wwd} } }} } }|d7}|dus=|r||ks=t d |dS) NrzMaxtasks {!r} is not valid_writerrz)worker got EOFError or OSError -- exitingzworker got sentinel -- exitingTFz0Possible encoding error while sending result: %szworker exiting after %d tasks) isinstanceintAssertionErrorformatputgethasattrr9close_readerEOFErrorOSErrorrdebug Exception_helper_reraises_exceptionr% __traceback__r0)inqueueoutqueue initializerinitargsZmaxtaskswrap_exceptionr>r?Z completedtaskjobifuncrkwdsresultewrappedrrrworkerasX        rVcCs|)z@Pickle-able helper function for use by _guarded_task_generation.r)ZexrrrrGrGcs2eZdZdZddfdd ZfddZZS) _PoolCachez Class that implements a cache for the Pool class that will notify the pool management threads every time the cache is emptied. The notification is done by the use of a queue that is provided when instantiating the cache. Nnotifiercs||_tj|i|dSr )rZr3r)rrZrrRr4rrrsz_PoolCache.__init__cs$t||s|jddSdSr )r3 __delitem__rZr>)ritemr4rrr[s z_PoolCache.__delitem__)r"r#r$r7rr[r8rrr4rrXsrXc@seZdZdZdZeddZ  dLddZej e fd d Z d d Z d dZ eddZeddZddZeddZeddZddZddZdifddZdMdd ZdMd!d"Z  dNd#d$Zd%d&ZdOd(d)ZdOd*d+Zdiddfd,d-Z  dNd.d/Z  dNd0d1ZedMd2d3Ze d4d5Z!ed6d7Z"ed8d9Z#ed:d;Z$dd?Z&d@dAZ'dBdCZ(edDdEZ)e dFdGZ*dHdIZ+dJdKZ,dS)PrzS Class which supports an async version of applying functions to arguments. TcOs|j|i|Sr Process)ctxrrRrrrr^sz Pool.ProcessNrcCs0g|_t|_|p t|_|t|_|j|_ t |j d|_ ||_ ||_ ||_|dur5tp4d}|dkr=td|durNt|trJ|dkrNtd|durZt|sZtd||_z|Wn!ty|jD] }|jdurx|qm|jD]}|q|w|}tjtj |j |j|j|j!|j|j|j"|j#|j |j|j |j$||j fd|_%d|j%_&t'|j%_|j%(tjtj)|j|j*|j#|j|j fd|_+d|j+_&t'|j+_|j+(tjtj,|j#|j-|j fd|_.d|j._&t'|j._|j.(t/j0||j1|j|j"|j#|j|j |j%|j+|j.|j f d d |_2t'|_dS) NrYrz&Number of processes must be at least 1rz/maxtasksperchild must be a positive int or Nonezinitializer must be a callabletargetrT)rZ exitpriority)3_poolr _stater_ctx _setup_queuesqueue SimpleQueue _taskqueue_change_notifierrX_cache_maxtasksperchild _initializer _initargsos cpu_count ValueErrorr:r;callable TypeError _processes_repopulate_poolrFexitcode terminater*_get_sentinels threadingZThreadr_handle_workersr^_inqueue _outqueue_wrap_exception_worker_handlerdaemonr start _handle_tasks _quick_put _task_handler_handle_results _quick_get_result_handlerrZFinalize_terminate_pool _terminate)r processesrKrLmaxtasksperchildcontextp sentinelsrrrrs                  z Pool.__init__cCsF|j|kr|d|t|dt|dddur!|jddSdSdS)Nz&unclosed running multiprocessing pool )sourcerj)rdResourceWarninggetattrrjr>)rZ_warnr rrr__del__ s  z Pool.__del__c Cs0|j}d|jd|jd|jdt|jd S)N<.z state=z pool_size=>)r5r#r$rdlenrc)rclsrrrr6s z Pool.__repr__cCs |jjg}|jjg}g||Sr )r|rBrj)rZtask_queue_sentinelsZself_notifier_sentinelsrrrrxs   zPool._get_sentinelscCsdd|DS)NcSsg|] }t|dr|jqS)sentinel)r@r).0rVrrr s z.Pool._get_worker_sentinels..rZworkersrrr_get_worker_sentinelsszPool._get_worker_sentinelscCsPd}ttt|D]}||}|jdur%td||d}||=q |S)zCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. FNcleaning up worker %dT)reversedrangerrvrrEr*)poolZcleanedrPrVrrr_join_exited_workers!s zPool._join_exited_workersc Cs0||j|j|j|j|j|j|j|j|j |j Sr ) _repopulate_pool_staticrer^rtrcr{r|rmrnrlr}r rrrru1s zPool._repopulate_poolc Csft|t|D](} ||t|||||| fd} | jdd| _d| _| || t dqdS)zBring the number of pool processes up to the specified number, for use after reaping workers which have exited. r`r^Z PoolWorkerTz added workerN) rrrVnamereplacerrappendrrE) r_r^rrrIrJrKrLrrMrPwrrrr:s  zPool._repopulate_pool_staticc Cs.t|rt|||||||||| dSdS)zEClean up any exited workers and start replacements for them. N)rrr) r_r^rrrIrJrKrLrrMrrr_maintain_poolMs  zPool._maintain_poolcCs4|j|_|j|_|jjj|_|jjj|_ dSr ) rerhr{r|r9sendrrBrecvrr rrrrfYs   zPool._setup_queuescCs|jtkr tddS)NzPool not running)rdr rqr rrr_check_running_s zPool._check_runningcCs||||S)zT Equivalent of `func(*args, **kwds)`. Pool must be running. ) apply_asyncr?)rrQrrRrrrapplycsz Pool.applycC|||t|S)zx Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ) _map_asyncrr?rrQiterable chunksizerrrrjszPool.mapcCr)z Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). )rrr?rrrrrqsz Pool.starmapcC|||t|||S)z= Asynchronous version of `starmap()` method. )rrrrQrrcallbackerror_callbackrrr starmap_asyncys zPool.starmap_asyncc csnzd}t|D] \}}||||fifVqWdSty6}z||dt|fifVWYd}~dSd}~ww)zProvides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.rN) enumeraterFrG)rZ result_jobrQrrPxrTrrr_guarded_task_generations$zPool._guarded_task_generationrcC||dkrt|}|j||j|||jf|S|dkr(td|t |||}t|}|j||jt ||jfdd|DS)zP Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. rzChunksize must be 1+, not {0:n}cs|] }|D]}|VqqdSr rrchunkr\rrr zPool.imap..) r IMapIteratorrir>r_job _set_lengthrqr=r _get_tasksrrrQrrrS task_batchesrrrimaps4z Pool.imapcCr)zL Like `imap()` method but ordering of results is arbitrary. rzChunksize must be 1+, not {0!r}csrr rrrrrrrz&Pool.imap_unordered..) rIMapUnorderedIteratorrir>rrrrqr=rrrrrrrimap_unordereds0zPool.imap_unorderedcCs6|t|||}|j|jd|||fgdf|S)z; Asynchronous version of `apply()` method. rN)r ApplyResultrir>r)rrQrrRrrrSrrrrs zPool.apply_asynccCr)z9 Asynchronous version of `map()` method. )rrrrrr map_asyncszPool.map_asyncc Cs|t|ds t|}|dur%tt|t|jd\}}|r%|d7}t|dkr-d}t|||}t||t|||d} |j | | j ||df| S)zY Helper function to implement map, starmap and their async counterparts. __len__Nrrr) rr@rdivmodrrcrr MapResultrir>rr) rrQrZmapperrrrZextrarrSrrrrs,  zPool._map_asynccCs,t||d|s||r dSdS)N)timeout)remptyr?)rchange_notifierrrrr_wait_for_updatess zPool._wait_for_updatesc Cst}|jtks|r9|jtkr9|||||||| | | | g||| }||||jtks|r9|jtks|dt ddS)Nzworker handler exiting) rycurrent_threadrdr r rrrr>rrE)rcache taskqueuer_r^rrrIrJrKrLrrMrrthreadZcurrent_sentinelsrrrrzs  zPool._handle_workersc Cstt}t|jdD]z\}}d}zm|D]D}|jtkr!tdnTz||WqtyW} z$|dd\} } z ||  | d| fWn t yLYnwWYd} ~ qd} ~ ww|rmtd|re|dnd} || dWd}}} q Wd}}} n d}}} wtdztd| dtd |D]} |dqWnt ytd Ynwtd dS) Nz'task handler found thread._state != RUNFzdoing set_length()rrztask handler got sentinelz/task handler sending sentinel to result handlerz(task handler sending sentinel to workersz/task handler got OSError when sending sentinelsztask handler exiting) ryriterr?rdr rrErF_setKeyErrorr>rD) rr>rJrrrZtaskseqZ set_lengthrNrTrOidxrrrrrsN            zPool._handle_tasksc Cst} z|}WnttfytdYdSw|jtkr'tdn*|dur1tdn |\}}}z ||||Wn t yIYnwd}}}q|r|jt krz|}WnttfyntdYdSw|durytdqQ|\}}}z ||||Wn t yYnwd}}}|r|jt ksXt |drtdzt dD] }|j sn|qWn ttfyYnwtd t||jdS) Nrz.result handler got EOFError/OSError -- exitingz,result handler found thread._state=TERMINATEzresult handler got sentinelz&result handler ignoring extra sentinelrBz"ensuring that outqueue is not full z7result handler exiting: len(cache)=%s, thread._state=%s)ryrrDrCrrErdr rrr r@rrBpollr)rJr?rrrNrOrPobjrrrr=sl                   zPool._handle_resultsccs0t|} tt||}|sdS||fVqr )rtuplerislice)rQitsizerrrrrys zPool._get_taskscCstd)Nz:pool objects cannot be passed between processes or pickled)NotImplementedErrorr rrrr-szPool.__reduce__cCs6td|jtkrt|_t|j_|jddSdS)Nz closing pool)rrErdr r r~rjr>r rrrrAs  z Pool.closecCstdt|_|dS)Nzterminating pool)rrEr rdrr rrrrws  zPool.terminatecCshtd|jtkrtd|jttfvrtd|j|j |j |j D]}|q+dS)Nz joining poolzPool is still runningzIn unknown state) rrErdr rqr r r~r*rrrc)rrrrrr*s       z Pool.joincCs\td|j|r(|jr,|jt d|r*|jsdSdSdSdS)Nz7removing tasks from inqueue until task handler finishedr) rrEZ_rlockacquireis_aliverBrrtimesleep)rI task_handlerrrrr_help_stuff_finishs    "zPool._help_stuff_finishc CsVtdt|_|dt|_td|||t||s,t| dkr,tdt|_|d|dtdt |urH| |rdt |ddrdtd|D] } | j durc| qXtdt |urs| td t |ur| |rt |ddrtd |D]} | rtd | j| qdSdSdS) Nzfinalizing poolz&helping task handler/workers to finishrz.Cannot have cache with result_hander not alivezjoining worker handlerrwzterminating workerszjoining task handlerzjoining result handlerzjoining pool workersr)rrEr rdr>rrrr<ryrr*r@rvrwpid) rrrIrJrrZworker_handlerrZresult_handlerrrrrrrsJ              zPool._terminate_poolcCs ||Sr )rr rrr __enter__szPool.__enter__cCs |dSr )rw)rexc_typeZexc_valZexc_tbrrr__exit__rz Pool.__exit__)NNrNNr )NNN)r)-r"r#r$r7r} staticmethodr^rwarningswarnr rr6rxrrrurrrfrrrrrrrrrrrr classmethodrzrrrr-rArwr*rrrrrrrrrsx  S                - ;   5 c@sJeZdZddZddZddZddd Zdd d Zd d Ze e j Z dS)rcCs>||_t|_tt|_|j|_||_||_ ||j|j<dSr ) rcryZEvent_eventnext job_counterrrk _callback_error_callback)rrrrrrrrs  zApplyResult.__init__cCs |jSr )rZis_setr rrrreadyrzApplyResult.readycCs|s td||jS)Nz{0!r} not ready)rrqr=_successr rrr successfulszApplyResult.successfulNcCs|j|dSr )rrrrrrrrr.zApplyResult.waitcCs(|||s t|jr|jS|jr )rrrr_valuerrrrr?s zApplyResult.getcCsZ|\|_|_|jr|jr||j|jr|js||j|j|j|j=d|_dSr ) rrrrrsetrkrrcrrPrrrrrs        zApplyResult._setr ) r"r#r$rrrrr?rrtypes GenericAlias__class_getitem__rrrrrs     rc@r)rcCsjtj||||dd|_dg||_||_|dkr(d|_|j|j|j =dS||t |||_dS)NrTr) rrrr _chunksize _number_leftrrrkrbool)rrrlengthrrrrrrs   zMapResult.__init__cCs|jd8_|\}}|r>|jr>||j||j|d|j<|jdkr<|jr-||j|j|j=|jd|_ dSdS|sI|jrId|_||_|jdkrf|j rW| |j|j|j=|jd|_ dSdS)NrrF) rrrrrrkrrrrcr)rrPZsuccess_resultsuccessrSrrrr)s*            zMapResult._setN)r"r#r$rrrrrrrs rc@s:eZdZddZddZd ddZeZdd Zd d ZdS) rcCsT||_tt|_tt|_|j|_t |_ d|_ d|_ i|_||j|j<dS)Nr)rcryZ ConditionZLock_condrrrrk collectionsdeque_items_index_length _unsorted)rrrrrrGs  zIMapIterator.__init__cCs|Sr rr rrr__iter__RszIMapIterator.__iter__Nc Cs|jIz|j}Wn9tyD|j|jkrd|_td|j|z|j}WntyA|j|jkr>d|_tdt dwYnwWdn1sOwY|\}}|r\|S|r ) rr popleft IndexErrorr r rc StopIterationrr)rrr\rr2rrrrUs0     zIMapIterator.nextcCs|j\|j|kr<|j||jd7_|j|jvr6|j|j}|j||jd7_|j|jvs|jn||j|<|j|jkrW|j|j =d|_ WddSWddS1sbwYdSNr) rr r rr popnotifyr rkrrcrrrrrms"         "zIMapIterator._setcCsh|j'||_|j|jkr"|j|j|j=d|_WddSWddS1s-wYdSr )rr r rrkrrc)rrrrrr~s   "zIMapIterator._set_lengthr ) r"r#r$rrr__next__rrrrrrrEs  rc@seZdZddZdS)rcCs||j1|j||jd7_|j|j|jkr,|j|j=d|_WddSWddS1s7wYdSr) rr rr rr rkrrcrrrrrs    "zIMapUnorderedIterator._setN)r"r#r$rrrrrrs rc@sVeZdZdZeddZdddZdd Zd d Zed d Z eddZ ddZ dS)rFcOsddlm}||i|S)Nrr])Zdummyr^)r_rrRr^rrrr^s zThreadPool.ProcessNrcCst||||dSr )rr)rrrKrLrrrrszThreadPool.__init__cCs,t|_t|_|jj|_|jj|_dSr )rgrhr{r|r>rr?rr rrrrfs   zThreadPool._setup_queuescCs |jjgSr )rjrBr rrrrxrzThreadPool._get_sentinelscCsgSr rrrrrrrWz ThreadPool._get_worker_sentinelscCsBz |jddqtjyYnwt|D]}|dqdS)NTF)block)r?rgZEmptyrr>)rIrrrPrrrrs   zThreadPool._help_stuff_finishcCst|dSr )rr)rrrrrrrrszThreadPool._wait_for_updates)NNr) r"r#r$r}rr^rrfrxrrrrrrrrs     )NrNF))__all__rrrorgryrr'rrr&rrrZ connectionrr r r r countrrrrFrr%r,r0rVrGdictrXobjectrrZ AsyncResultrrrrrrrrsP     -@++E