o ?Og|@s dZddlZddlmZddlZddlZddlZddlm Z ddl Z ddl Z ddl m Z ddlZddlZddlZe ZdaGdddZd d Ze ed Zd ZGd ddeZGdddZddZGdddeZGdddeZGdddeZ Gddde Z!ddZ"ddZ#d-dd Z$d!d"Z%Gd#d$d$e j&Z'da(da)d%d&Z*d'd(Z+Gd)d*d*ej,Z-Gd+d,d,ej.Z/dS).z"Brian Quinlan (brian@sweetapp.com)N)_base)Queue)partialFc@s,eZdZddZddZddZddZd S) _ThreadWakeupcCsd|_tjdd\|_|_dS)NF)Zduplex)_closedmpZPipe_reader_writerselfr A/opt/alt/python310/lib64/python3.10/concurrent/futures/process.py__init__Csz_ThreadWakeup.__init__cCs(|jsd|_|j|jdSdSNT)rr closerr r r r rGs  z_ThreadWakeup.closecCs|js |jddSdS)N)rr Z send_bytesr r r r wakeupMsz_ThreadWakeup.wakeupcCs0|js|jr|j|jsdSdSdSN)rrZpollZ recv_bytesr r r r clearQs   z_ThreadWakeup.clearN)__name__ __module__ __qualname__rrrrr r r r rBs  rcCs@datt}|D]\}}|q |D]\}}|qdSr)_global_shutdownlist_threads_wakeupsitemsrjoin)r_ thread_wakeuptr r r _python_exitWs     r =c@eZdZddZddZdS)_RemoteTracebackcCs ||_dSrtb)r r&r r r rws z_RemoteTraceback.__init__cCs|jSrr%r r r r __str__ysz_RemoteTraceback.__str__N)rrrrr'r r r r r$vs r$c@r#)_ExceptionWithTracebackcCs8tt|||}d|}||_d|j_d||_dS)Nz """ %s""") tracebackformat_exceptiontyperexc __traceback__r&)r r-r&r r r r}s  z _ExceptionWithTraceback.__init__cCst|j|jffSr) _rebuild_excr-r&r r r r __reduce__sz"_ExceptionWithTraceback.__reduce__N)rrrrr0r r r r r(|s r(cCst||_|Sr)r$ __cause__)r-r&r r r r/s r/c@eZdZddZdS) _WorkItemcC||_||_||_||_dSr)futurefnargskwargs)r r5r6r7r8r r r r z_WorkItem.__init__Nrrrrr r r r r3 r3c@seZdZdddZdS) _ResultItemNcCs||_||_||_dSr)work_id exceptionresult)r r=r>r?r r r rs z_ResultItem.__init__NNr:r r r r r<sr<c@r2) _CallItemcCr4r)r=r6r7r8)r r=r6r7r8r r r rr9z_CallItem.__init__Nr:r r r r rAr;rAcs,eZdZ dfdd ZfddZZS) _SafeQueuercs&||_||_||_tj||ddS)N)ctx)pending_work_items shutdown_lockrsuperr)r max_sizerCrDrEr __class__r r rsz_SafeQueue.__init__cst|trHtt|||j}tdd||_ |j |j d}|j |jWdn1s5wY|durF|j|dSdSt||dS)Nz """ {}"""r)) isinstancerAr*r+r,r.r$formatrr1rDpopr=rErrr5 set_exceptionrF_on_queue_feeder_error)r eobjr& work_itemrHr r rNs  z!_SafeQueue._on_queue_feeder_error)r)rrrrrN __classcell__r r rHr rBsrBcgs. t|} tt||}|sdS|Vqr)ziptuple itertoolsislice) chunksize iterablesitchunkr r r _get_chunkssr[cs fdd|DS)Ncsg|]}|qSr r ).0r7r6r r sz"_process_chunk..r )r6rZr r]r _process_chunksr_c Csb z |t|||dWdSty0}zt||j}|t||dWYd}~dSd}~ww)N)r?r>r>)putr< BaseExceptionr(r.) result_queuer=r?r>rOr-r r r _sendback_results   rdc Cs |durz||WntytjjdddYdSw |jdd}|dur2|tdSz |j|j i|j }Wn ty^}zt ||j }t ||j|dWYd}~nd}~wwt ||j|d~~q)NzException in initializer:T)exc_infoblockr`)r?)rbrZLOGGERZcriticalgetraosgetpidr6r7r8r(r.rdr=) call_queuerc initializerinitargsZ call_itemrrOr-r r r _process_workers0     rocsneZdZ fddZddZddZddZd d Zd d Zd dZ ddZ ddZ ddZ ddZ ZS)_ExecutorManagerThreadcsf|j|_|j|_|j|jfdd}t|||_|j|_|j |_ |j |_ |j |_|j|_tdS)NcSs>tjd| |WddS1swYdS)Nz?Executor collected: triggering callback for QueueManager wakeup)rutildebugr)rrrEr r r weakref_cbs  "z3_ExecutorManagerThread.__init__..weakref_cb)_executor_manager_thread_wakeupr_shutdown_lockrEweakrefrefexecutor_reference _processes processes _call_queuerk _result_queuerc _work_idswork_ids_queue_pending_work_itemsrDrFr)r executorrsrHr r rs  z_ExecutorManagerThread.__init__cCs ||\}}}|r||dS|dur-||~|}|dur,|j~|rB|||j sB| dSqr) add_call_item_to_queuewait_result_broken_or_wakeupterminate_brokenprocess_result_itemrx_idle_worker_semaphorereleaseis_shutting_downflag_executor_shutting_downrDjoin_executor_internals)r result_item is_brokencauserr r r run:s(   z_ExecutorManagerThread.runcCs| |jrdSz |jjdd}Wn tjyYdSw|j|}|jr8|jj t ||j |j |j ddn|j|=qq)NTFrf)rkZfullr~rhqueueEmptyrDr5Zset_running_or_notify_cancelrarAr6r7r8)r r=rQr r r r_s(    z-_ExecutorManagerThread.add_call_item_to_queuec Cs|jj}|jj}||g}ddt|jD}tj||}d}d}d}||vrOz| }d}Wn"t yN} zt t | | | j}WYd} ~ n d} ~ ww||vrUd}|j |jWdn1shwY|||fS)NcSsg|]}|jqSr )Zsentinelr\pr r r r^szG_ExecutorManagerThread.wait_result_broken_or_wakeup..TF)rcrrrrzvaluesrZ connectionwaitZrecvrbr*r+r,r.rEr) r Z result_readerZ wakeup_readerZreadersZworker_sentinelsZreadyrrrrOr r r rvs,   z3_ExecutorManagerThread.wait_result_broken_or_wakeupcCszt|tr|j|}||js|dSdS|j|jd}|dur;|jr2|j |jdS|j |j dSdSr) rJintrzrLrrrDr=r>r5rMZ set_resultr?)r rrrQr r r rs  z*_ExecutorManagerThread.process_result_itemcCs|}tp |dup |jSr)rxr_shutdown_thread)r rr r r rs z'_ExecutorManagerThread.is_shutting_downcCs|}|durd|_d|_d}td}|dur$tdd|d|_|jD] \}}|j |~q)|j |j D]}|q?|dS)NzKA child process terminated abruptly, the process pool is not usable anymoreTz^A process in the process pool was terminated abruptly while the future was running or pending.z ''' r)z''')rx_brokenrBrokenProcessPoolr$rr1rDrr5rMrrzrZ terminater)r rrZbper=rQrr r r rs"    z'_ExecutorManagerThread.terminate_brokencCs|}|dur?d|_|jrAi}|jD] \}}|js"|||<q||_ z|jWn t j y8Ynwq'd|_dSdSdS)NTF) rxr_cancel_pending_futuresrDrr5Zcancelr~Z get_nowaitrr)r rZnew_pending_work_itemsr=rQr r r rs(  z2_ExecutorManagerThread.flag_executor_shutting_downc Cs|}d}||kr<|dkr>t||D]}z |jd|d7}Wqtjy/Ynw||kr@|dksdSdSdSdS)Nrr!)get_n_children_aliverangerkZ put_nowaitrZFull)r Zn_children_to_stopZn_sentinels_sentir r r shutdown_workerss    z'_ExecutorManagerThread.shutdown_workerscCsh||j|j|j |jWdn1s!wY|jD]}|q+dSr) rrkrZ join_threadrErrzrrr rr r r rs    z._ExecutorManagerThread.join_executor_internalscCstdd|jDS)Ncss|]}|VqdSr)Zis_aliverr r r sz>_ExecutorManagerThread.get_n_children_alive..)sumrzrr r r r rsz+_ExecutorManagerThread.get_n_children_alive)rrrrrrrrrrrrrrrRr r rHr rps +% & rpc Cstrtrttdazddl}Wn tydattwztd}Wn ttfy1YdSw|dkr8dS|dkr>dSd|att)NTrzxThis Python build lacks multiprocessing.synchronize, usually due to named semaphores being unavailable on this platform.SC_SEM_NSEMS_MAXz@system provides too few semaphores (%d available, 256 necessary)) _system_limits_checked_system_limitedNotImplementedErrorZmultiprocessing.synchronize ImportErrorrisysconfAttributeError ValueError)multiprocessingZ nsems_maxr r r _check_system_limitss0  rccs, |D]}||r|V|s qdSr)reverserL)iterableZelementr r r _chain_from_iterable_of_lists9s rc@s eZdZdS)rN)rrrr r r r rEsrcseZdZ  dddZddZddZd d Zd d Zd dZe j jj e_ dddfdd Z dddddZ e j j j e _ ZS)ProcessPoolExecutorNr cCsJ t|durtp d|_tjdkrtt|j|_n|dkr$tdtjdkr4|tkr4tdt||_|dur?t }||_ |j j dddk|_ |durXt|sXtd ||_||_d|_i|_d|_t|_td|_d|_d|_i|_d|_t|_|jt }t!||j |j|j|jd |_"d |j"_#|$|_%t&'|_(dS) Nr!Zwin32rz"max_workers must be greater than 0zmax_workers must be <= F)Z allow_noneforkzinitializer must be a callable)rGrCrDrErT))rri cpu_count _max_workerssysplatformmin_MAX_WINDOWS_WORKERSrrZ get_context _mp_contextZget_start_method#_safe_to_dynamically_spawn_childrencallable TypeError _initializer _initargs_executor_manager_threadryr threadingZLockruZ Semaphorerr _queue_countrrrrtEXTRA_QUEUED_CALLSrBr{Z _ignore_epipeZ SimpleQueuer|rrr})r Z max_workersZ mp_contextrlrmZ queue_sizer r r rMs\       zProcessPoolExecutor.__init__cCs@|jdur|js |t||_|j|jt|j<dSdSr)rr_launch_processesrpstartrtrr r r r _start_executor_manager_threads    z2ProcessPoolExecutor._start_executor_manager_threadcCs6|jjddr dSt|j}||jkr|dSdS)NF)Zblocking)racquirelenryr_spawn_process)r Z process_countr r r _adjust_process_counts    z)ProcessPoolExecutor._adjust_process_countcCs$tt|j|jD]}|q dSr)rrryrr)r rr r r rs z%ProcessPoolExecutor._launch_processescCs8|jjt|j|j|j|jfd}|||j|j <dS)N)targetr7) rZProcessror{r|rrrrypidrr r r rsz"ProcessPoolExecutor._spawn_processcOs|jN|jr t|j|jrtdtrtdt}t||||}||j |j <|j |j |j d7_ |j |jrD|||WdS1sTwYdS)Nz*cannot schedule new futures after shutdownz6cannot schedule new futures after interpreter shutdownr!)rurrr RuntimeErrorrrZFuturer3rrr}rartrrrr)r r6r7r8fwr r r submits$   $zProcessPoolExecutor.submitr!)timeoutrWcs< |dkr tdtjtt|t|d|i|d}t|S)Nr!zchunksize must be >= 1.rW)r)rrFmaprr_r[r)r r6rrWrXZresultsrHr r rs zProcessPoolExecutor.mapTF)cancel_futurescCs|j||_d|_|jdur|jWdn1swY|jdur/|r/|jd|_d|_|jdurA|rA|j d|_d|_ d|_dSr) rurrrtrrrr{r|rry)r rrr r r shutdowns      zProcessPoolExecutor.shutdown)NNNr )T)rrrrrrrrrrExecutor__doc__rrrRr r rHr rLs U  rr@)0 __author__riZconcurrent.futuresrrrrZmultiprocessing.connectionZmultiprocessing.queuesrrrv functoolsrrUrr*WeakKeyDictionaryrrrr Z_register_atexitrr Exceptionr$r(r/objectr3r<rArBr[r_rdroZThreadrprrrrZBrokenExecutorrrrr r r r sR*       )