U ifT@s.dZdZddlZddlZddlZddlZddlZddlZddlZz ddl Z Wne k rddZ YnXddl m Z ddl m Z ddl mZddl mZdd l mZdd l mZdd l mZdd l mZdd lmZddZddZGddde jZGdddejejZGdddeZGdddeZdS)zEvent loop using a selector and related classes. A selector is a "notify-when-ready" multiplexer. For a subclass which also includes support for signal handling, see the unix_events sub-module. )BaseSelectorEventLoopN) base_events) constants)events)futures) protocols)sslproto) transports)trsock)loggercCs8z||}Wntk r$YdSXt|j|@SdSNF)get_keyKeyErrorboolr)selectorfdZeventkeyrZ!d?d@Z"dAdBZ#dCdDZ$dEdFZ%dGdHZ&dIdJZ'dKdLZ(dMdNZ)dOdPZ*dQdRZ+Z,S)WrzJSelector event loop. See events.EventLoop for API specification. NcsFt|dkrt}td|jj||_| t |_ dS)NzUsing selector: %s) super__init__ selectorsZDefaultSelectorr debug __class____name__ _selector_make_self_pipeweakrefZWeakValueDictionary _transports)selfrr rrr6s zBaseSelectorEventLoop.__init__extraservercCst||||||SN)_SelectorSocketTransport)r&rprotocolwaiterr)r*rrr_make_socket_transport@s z,BaseSelectorEventLoop._make_socket_transportF) server_sideserver_hostnamer)r*ssl_handshake_timeoutc Cs0tj||||||| d} t||| ||d| jS)N)r2r()r Z SSLProtocolr,Z_app_transport) r&Zrawsockr- sslcontextr.r0r1r)r*r2Z ssl_protocolrrr_make_ssl_transportEsz)BaseSelectorEventLoop._make_ssl_transportcCst||||||Sr+)_SelectorDatagramTransport)r&rr-addressr.r)rrr_make_datagram_transportRs z.BaseSelectorEventLoop._make_datagram_transportcsL|rtd|rdS|t|jdk rH|jd|_dS)Nz!Cannot close a running event loop)Z is_running RuntimeError is_closed_close_self_pipercloser"r&r'rrr;Ws   zBaseSelectorEventLoop.closecCsB||j|jd|_|jd|_|jd8_dS)Nr)_remove_reader_ssockfilenor;_csock _internal_fdsr<rrrr:bs   z&BaseSelectorEventLoop._close_self_pipecCsNt\|_|_|jd|jd|jd7_||j|jdS)NFr) socketZ socketpairr>r@ setblockingrA _add_readerr?_read_from_selfr<rrrr#js   z%BaseSelectorEventLoop._make_self_pipecCsdSr+rr&datarrr_process_self_datarsz(BaseSelectorEventLoop._process_self_datacCsXz"|jd}|sWqT||Wqtk r:YqYqtk rPYqTYqXqdS)Ni)r>recvrHInterruptedErrorBlockingIOErrorrFrrrrEus z%BaseSelectorEventLoop._read_from_selfcCsN|j}|dkrdSz|dWn(tk rH|jrDtjdddYnXdS)Nz3Fail to write a null byte into the self-pipe socketTexc_info)r@sendOSError_debugr r)r&Zcsockrrr_write_to_selfsz$BaseSelectorEventLoop._write_to_selfdc Cs"|||j||||||dSr+)rDr?_accept_connection)r&protocol_factoryrr3r*backlogr2rrr_start_servingsz$BaseSelectorEventLoop._start_servingc Cst|D]}z0|\}} |jr0td|| ||dWntttfk rZYdSt k r} zd| j t j t j t j t jfkr|d| t|d|||tj|j||||||nW5d} ~ XYqXd| i} |||| |||} || qdS)Nz#%r got a new connection from %r: %rFz&socket.accept() out of system resource)message exceptionrBpeername)rangeacceptrQr rrCrKrJConnectionAbortedErrorrPerrnoZEMFILEZENFILEZENOBUFSZENOMEMcall_exception_handlerr TransportSocketr=r?Z call_laterrZACCEPT_RETRY_DELAYrW_accept_connection2Z create_task) r&rUrr3r*rVr2_connaddrexcr)r\rrrrTsV   z(BaseSelectorEventLoop._accept_connectionc sd}d}zt|}|} |r8|j|||| d|||d}n|j||| ||d}z| IdHWntk rx|YnXWntttfk rYn\tk r} z>|jrd| d} |dk r|| d<|dk r|| d<|| W5d} ~ XYnXdS)NT)r.r0r)r*r2)r.r)r*z3Error on transport creation for incoming connection)rXrYr- transport) create_futurer4r/ BaseExceptionr; SystemExitKeyboardInterruptrQr_) r&rUrcr)r3r*r2r-rfr.recontextrrrrasP z)BaseSelectorEventLoop._accept_connection2c Cs|}t|tsJzt|}Wn*tttfk rHtd|dYnXz|j|}Wntk rlYnX|st d|d|dS)NzInvalid file object: zFile descriptor z is used by transport ) rintr?AttributeErrorr ValueErrorr%r is_closingr8)r&rr?rfrrr_ensure_fd_no_transports z-BaseSelectorEventLoop._ensure_fd_no_transportc Gs|t|||d}z|j|}Wn*tk rR|j|tj|dfYn>X|j|j }\}}|j ||tjB||f|dk r| dSr+) _check_closedrHandler"rrregisterr EVENT_READrGmodifycancel r&rcallbackargsZhandlermaskreaderwriterrrrrDs  z!BaseSelectorEventLoop._add_readercCs|r dSz|j|}Wntk r2YdSX|j|j}\}}|tjM}|sd|j|n|j ||d|f|dk r| dSdSdSNFT) r9r"rrrrGrrt unregisterrurvr&rrrzr{r|rrrr=s z$BaseSelectorEventLoop._remove_readerc Gs|t|||d}z|j|}Wn*tk rR|j|tjd|fYn>X|j|j }\}}|j ||tjB||f|dk r| dSr+) rqrrrr"rrrsr EVENT_WRITErGrurvrwrrr _add_writer%s  z!BaseSelectorEventLoop._add_writercCs|r dSz|j|}Wntk r2YdSX|j|j}\}}|tjM}|sd|j|n|j |||df|dk r| dSdSdS)Remove a writer callback.FNT) r9r"rrrrGrrr~rurvrrrr_remove_writer4s z$BaseSelectorEventLoop._remove_writercGs|||j||f|S)zAdd a reader callback.)rprDr&rrxryrrr add_readerKs z BaseSelectorEventLoop.add_readercCs||||S)zRemove a reader callback.)rpr=r&rrrr remove_readerPs z#BaseSelectorEventLoop.remove_readercGs|||j||f|S)zAdd a writer callback..)rprrrrr add_writerUs z BaseSelectorEventLoop.add_writercCs||||S)r)rprrrrr remove_writerZs z#BaseSelectorEventLoop.remove_writerc st||jr"|dkr"tdz ||WSttfk rFYnX|}|}| ||j |||| t |j||IdHS)zReceive data from the socket. The return value is a bytes object representing the data received. The maximum amount of data to be received at once is specified by nbytes. rthe socket must be non-blockingN)rrQ gettimeoutrnrIrKrJrgr?r _sock_recvadd_done_callback functoolspartial_sock_read_done)r&rnfutrrrr sock_recv_s  zBaseSelectorEventLoop.sock_recvcCs||dSr+)rr&rrrrrrtsz%BaseSelectorEventLoop._sock_read_donec Cs|r dSz||}Wn\ttfk r4YdSttfk rLYn6tk rv}z||W5d}~XYn X||dSr+) donerIrKrJrirjrh set_exception set_result)r&rrrrGrerrrrwsz BaseSelectorEventLoop._sock_recvc st||jr"|dkr"tdz ||WSttfk rFYnX|}|}| ||j |||| t |j||IdHS)zReceive data from the socket. The received data is written into *buf* (a writable buffer). The return value is the number of bytes written. rrN)rrQrrn recv_intorKrJrgr?r_sock_recv_intorrrr)r&rbufrrrrrsock_recv_intos  z$BaseSelectorEventLoop.sock_recv_intoc Cs|r dSz||}Wn\ttfk r4YdSttfk rLYn6tk rv}z||W5d}~XYn X||dSr+) rrrKrJrirjrhrr)r&rrrnbytesrerrrrsz%BaseSelectorEventLoop._sock_recv_intoc st||jr"|dkr"tdz||}Wnttfk rLd}YnX|t|kr^dS|}| }| t |j ||||j||t||g|IdHS)aSend data to the socket. The socket must be connected to a remote socket. This method continues to send data from data until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully processed by the receiving end of the connection. rrN)rrQrrnrOrKrJlenrgr?rrr_sock_write_doner _sock_sendall memoryview)r&rrGrrrrrr sock_sendalls&    z"BaseSelectorEventLoop.sock_sendallc Cs|r dS|d}z|||d}Wnbttfk rDYdSttfk r\Yn2tk r}z||WYdSd}~XYnX||7}|t|kr| dn||d<dS)Nr) rrOrKrJrirjrhrrr)r&rrZviewposstartrrerrrrs    z#BaseSelectorEventLoop._sock_sendallcst||jr"|dkr"tdttdr8|jtjkrf|j||j|j |dIdH}|d\}}}}}| }| ||||IdHS)zTConnect to a remote socket at address. This method is a coroutine. rrAF_UNIX)familyprotoloopN) rrQrrnhasattrrBrrZ_ensure_resolvedrrg _sock_connect)r&rr6Zresolvedrbrrrr sock_connects z"BaseSelectorEventLoop.sock_connectc Cs|}z||Wnttfk rV|t|j||||j |||YnNt t fk rnYn6t k r}z| |W5d}~XYn X|ddSr+)r?ZconnectrKrJrrrrr_sock_connect_cbrirjrhrr)r&rrr6rrerrrrs z#BaseSelectorEventLoop._sock_connectcCs||dSr+)rrrrrrsz&BaseSelectorEventLoop._sock_write_donec Cs|r dSz,|tjtj}|dkr6t|d|WnZttfk rPYnNtt fk rhYn6t k r}z| |W5d}~XYn X| ddS)NrzConnect call failed ) rZ getsockoptrBZ SOL_SOCKETZSO_ERRORrPrKrJrirjrhrr)r&rrr6errrerrrrsz&BaseSelectorEventLoop._sock_connect_cbcsBt||jr"|dkr"td|}||d||IdHS)aWAccept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. rrFN)rrQrrnrg _sock_accept)r&rrrrr sock_accepts z!BaseSelectorEventLoop.sock_acceptc Cs|}|r|||r"dSz|\}}|dWnnttfk rh|||j|d|YnRt t fk rYn:t k r}z| |W5d}~XYnX| ||fdSr})r?rrr\rCrKrJrrrirjrhrr)r&rZ registeredrrrcr6rerrrr*s  z"BaseSelectorEventLoop._sock_acceptc sp|j|j=|}||IdHz |j|j|||ddIdHWS||r^|||j|j<XdS)NF)Zfallback) r%_sock_fd is_reading pause_reading_make_empty_waiter_reset_empty_waiterresume_readingZ sock_sendfile_sock)r&Ztranspfileoffsetcountrrrr_sendfile_native<s z&BaseSelectorEventLoop._sendfile_nativecCs|D]v\}}|j|j}\}}|tj@rL|dk rL|jrB||n |||tj@r|dk r|jrp||q||qdSr+) fileobjrGrrtZ _cancelledr=Z _add_callbackrr)r&Z event_listrrzrr{r|rrr_process_eventsJs    z%BaseSelectorEventLoop._process_eventscCs|||dSr+)r=r?r;)r&rrrr _stop_servingXsz#BaseSelectorEventLoop._stop_serving)N)N)N)NNN)-r! __module__ __qualname____doc__rr/rZSSL_HANDSHAKE_TIMEOUTr4r7r;r:r#rHrErRrWrTrarprDr=rrrrrrrrrrrrrrrrrrrrrr __classcell__rrr'rr0s~        . )rcseZdZdZeZdZdfdd ZddZddZ d d Z d d Z d dZ ddZ ejfddZdddZddZddZddZddZZS) _SelectorTransportiNcst||t||jd<z||jd<Wntk rNd|jd<YnXd|jkrz||jd<Wn tj k rd|jd<YnX||_ | |_ d|_ ||||_||_d|_d|_|jdk r|j||j|j <dS)NrBZsocknamerZFr)rrr r`_extraZ getsocknamerPZ getpeernamerBerrorrr?r_protocol_connected set_protocol_server_buffer_factory_buffer _conn_lost_closingZ_attachr%)r&rrr-r)r*r'rrris,      z_SelectorTransport.__init__cCs|jjg}|jdkr |dn|jr0|d|d|j|jdk r|jst|jj |jt j }|rz|dn |dt|jj |jt j }|rd}nd}| }|d|d |d d d |S) Nclosedclosingzfd=z read=pollingz read=idlepollingZidlezwrite=z<{}> )r r!rappendrr_loopr9rr"rrtrget_write_buffer_sizeformatjoin)r&inforstatebufsizerrr__repr__s0      z_SelectorTransport.__repr__cCs|ddSr+) _force_closer<rrrabortsz_SelectorTransport.abortcCs||_d|_dSNT) _protocolrr&r-rrrrsz_SelectorTransport.set_protocolcCs|jSr+)rr<rrr get_protocolsz_SelectorTransport.get_protocolcCs|jSr+)rr<rrrrosz_SelectorTransport.is_closingcCsT|jr dSd|_|j|j|jsP|jd7_|j|j|j|jddSNTr) rrr=rrrr call_soon_call_connection_lostr<rrrr;sz_SelectorTransport.closecCs,|jdk r(|d|t|d|jdS)Nzunclosed transport )source)rResourceWarningr;)r&Z_warnrrr__del__s z_SelectorTransport.__del__Fatal error on transportcCsNt|tr(|jr@tjd||ddn|j||||jd||dS)Nz%r: %sTrM)rXrYrfr-) rrPr get_debugr rr_rr)r&rerXrrr _fatal_errors  z_SelectorTransport._fatal_errorcCsd|jr dS|jr(|j|j|j|jsBd|_|j|j|jd7_|j|j |dSr) rrclearrrrrr=rrr&rerrrrs z_SelectorTransport._force_closecCsVz|jr|j|W5|jd|_d|_d|_|j}|dk rP|d|_XdSr+)rr;rrrZ_detachrZconnection_lost)r&rer*rrrrs z(_SelectorTransport._call_connection_lostcCs t|jSr+)rrr<rrrrsz(_SelectorTransport.get_write_buffer_sizecGs"|jr dS|jj||f|dSr+)rrrDrrrrrDsz_SelectorTransport._add_reader)NN)r)r!rrmax_size bytearrayrrrrrrrror;warningswarnrrrrrrDrrrr'rr]s    rcseZdZdZejjZd#fdd ZfddZ ddZ d d Z d d Z d dZ ddZddZddZddZddZddZddZfddZdd Zd!d"ZZS)$r,TNcs~d|_t|||||d|_d|_d|_t|j|j |j j ||j |j |j|j|dk rz|j tj|ddSr )_read_ready_cbrr_eof_paused _empty_waiterrZ _set_nodelayrrrrconnection_maderDr _read_readyr_set_result_unless_cancelled)r&rrr-r.r)r*r'rrrs    z!_SelectorSocketTransport.__init__cs.t|tjr|j|_n|j|_t|dSr+)rrZBufferedProtocol_read_ready__get_bufferr_read_ready__data_receivedrrrr'rrr s  z%_SelectorSocketTransport.set_protocolcCs|j o|j Sr+)rrr<rrrrsz#_SelectorSocketTransport.is_readingcCs>|js |jrdSd|_|j|j|jr:td|dS)NTz%r pauses reading)rrrr=rrr rr<rrrrs   z&_SelectorSocketTransport.pause_readingcCs@|js |jsdSd|_||j|j|jrYn4tk rp}z| |dWYdSd}~XYnX|r|j |j n| dS)Nz%r received EOFz1Fatal error: protocol.eof_received() call failed.) rrr rrZ eof_receivedrirjrhrr=rr;)r&Z keep_openrerrrres  z,_SelectorSocketTransport._read_ready__on_eofc Cs6t|tttfs$tdt|j|jr2td|j dk rDtd|sLdS|j rz|j t j krht d|j d7_ dS|jsz|j|}Wnbttfk rYnbttfk rYnJtk r}z||dWYdSd}~XYnX||d}|s dS|j|j|j|j||dS)N/data argument must be a bytes-like object, not z%Cannot call write() after write_eof()z(unable to write; sendfile is in progresssocket.send() raised exception.r%Fatal write error on socket transport)rbytesrrrtyper!rr8rrr!LOG_THRESHOLD_FOR_CONNLOST_WRITESr warningrrrOrKrJrirjrhrrrr _write_readyextend_maybe_pause_protocol)r&rGrrerrrwritezs:      z_SelectorSocketTransport.writec Cs(|jstd|jrdSz|j|j}Wnttfk rBYnttfk rZYnt k r}z>|j |j |j ||d|jdk r|j|W5d}~XYnpX|r|jd|=||js$|j |j |jdk r|jd|jr|dn|jr$|jtjdS)NzData should not be emptyr)rAssertionErrorrrrOrKrJrirjrhrrrrrrr_maybe_resume_protocolrrrrshutdownrBSHUT_WR)r&rrerrrrs4       z%_SelectorSocketTransport._write_readycCs.|js |jrdSd|_|js*|jtjdSr)rrrrrrBrr<rrr write_eofs  z"_SelectorSocketTransport.write_eofcCsdSrrr<rrr can_write_eofsz&_SelectorSocketTransport.can_write_eofcs*t||jdk r&|jtddS)NzConnection is closed by peer)rrrrConnectionErrorrr'rrrs   z._SelectorSocketTransport._call_connection_lostcCs6|jdk rtd|j|_|js0|jd|jS)NzEmpty waiter is already set)rr8rrgrrr<rrrrs    z+_SelectorSocketTransport._make_empty_waitercCs d|_dSr+)rr<rrrrsz,_SelectorSocketTransport._reset_empty_waiter)NNN)r!rrZ_start_tls_compatiblerZ _SendfileModeZ TRY_NATIVEZ_sendfile_compatiblerrrrrrrrrrrr r rrrrrrr'rr,s* %' r,csFeZdZejZd fdd ZddZddZd dd Z d d Z Z S)r5Ncs^t||||||_|j|jj||j|j|j|j |dk rZ|jt j |ddSr+) rr_addressrrrrrDrrrr)r&rrr-r6r.r)r'rrrs  z#_SelectorDatagramTransport.__init__cCstdd|jDS)Ncss|]\}}t|VqdSr+)r).0rGrbrrr szC_SelectorDatagramTransport.get_write_buffer_size..)sumrr<rrrrsz0_SelectorDatagramTransport.get_write_buffer_sizec Cs|jr dSz|j|j\}}Wnttfk r8Yntk rd}z|j|W5d}~XYnTt t fk r|Yn<t k r}z| |dW5d}~XYnX|j ||dS)Nz&Fatal read error on datagram transport)rrZrecvfromrrKrJrPrerror_receivedrirjrhrZdatagram_receivedr&rGrdrerrrrsz&_SelectorDatagramTransport._read_readyc Cst|tttfs$tdt|j|s,dS|jrV|d|jfkrPtd|j|j}|j r|jr|j t j krxt d|j d7_ dS|jslz,|jdr|j|n|j||WdSttfk r|j|j|jYntk r}z|j|WYdSd}~XYnPttfk r6Yn6tk rj}z||dWYdSd}~XYnX|j t||f|!dS)Nrz!Invalid address: must be None or rrrZ'Fatal write error on datagram transport)"rrrrrrr!r rnrrrr rrrrrOsendtorKrJrrr _sendto_readyrPrrrirjrhrrrrrrrrsH      z!_SelectorDatagramTransport.sendtoc Cs|jr|j\}}z*|jdr.|j|n|j||Wqttfk rj|j||fYqYqt k r}z|j |WYdSd}~XYqt t fk rYqtk r}z||dWYdSd}~XYqXq||js|j|j|jr|ddS)NrZr)rpopleftrrrOrrKrJ appendleftrPrrrirjrhrrrrrrrrrrrr*s2  z(_SelectorDatagramTransport._sendto_ready)NNN)N) r!rr collectionsdequerrrrrrrrrr'rr5s  +r5)r__all__rr^rrrBrr$r ImportErrorrrrrrr r r logr rrZ BaseEventLooprZ_FlowControlMixinZ Transportrr,r5rrrrsF            1o