a DOgL@s$dZdZddlZddlZddlZddlZddlZddlZddlZz ddl Z Wne ybdZ Yn0ddl m Z ddl m Z ddl mZddl mZdd l mZdd l mZdd l mZdd l mZdd lmZddZGddde jZGdddejejZGdddeZGdddeZdS)zEvent loop using a selector and related classes. A selector is a "notify-when-ready" multiplexer. For a subclass which also includes support for signal handling, see the unix_events sub-module. )BaseSelectorEventLoopN) base_events) constants)events)futures) protocols)sslproto) transports)trsock)loggercCs6z||}Wnty"YdS0t|j|@SdSNF)get_keyKeyErrorboolr)selectorfdZeventkeyrZ!d?d@Z"dAdBZ#dCdDZ$dXdEdFZ%dGdHZ&dIdJZ'dKdLZ(dMdNZ)dOdPZ*dQdRZ+Z,S)YrzJSelector event loop. See events.EventLoop for API specification. NcsFt|durt}td|jj||_| t |_ dS)NzUsing selector: %s) super__init__ selectorsZDefaultSelectorr debug __class____name__ _selector_make_self_pipeweakrefZWeakValueDictionary _transports)selfrrrrr1s zBaseSelectorEventLoop.__init__extraservercCst||||||SN)_SelectorSocketTransport)r!sockprotocolwaiterr$r%rrr_make_socket_transport;s z,BaseSelectorEventLoop._make_socket_transportF) server_sideserver_hostnamer$r%ssl_handshake_timeoutc Cs0tj||||||| d} t||| ||d| jS)N)r.r#)r Z SSLProtocolr'Z_app_transport) r!Zrawsockr) sslcontextr*r,r-r$r%r.Z ssl_protocolrrr_make_ssl_transport@sz)BaseSelectorEventLoop._make_ssl_transportcCst||||||Sr&)_SelectorDatagramTransport)r!r(r)addressr*r$rrr_make_datagram_transportMsz.BaseSelectorEventLoop._make_datagram_transportcsL|rtd|rdS|t|jdurH|jd|_dS)Nz!Cannot close a running event loop)Z is_running RuntimeError is_closed_close_self_pipercloserr!r"rrr7Rs   zBaseSelectorEventLoop.closecCsB||j|jd|_|jd|_|jd8_dS)Nr)_remove_reader_ssockfilenor7_csock _internal_fdsr8rrrr6]s   z&BaseSelectorEventLoop._close_self_pipecCsNt\|_|_|jd|jd|jd7_||j|jdS)NFr) socketZ socketpairr:r< setblockingr= _add_readerr;_read_from_selfr8rrrres   z%BaseSelectorEventLoop._make_self_pipecCsdSr&rr!datarrr_process_self_datamsz(BaseSelectorEventLoop._process_self_datacCsTz"|jd}|sWqP||Wqty8YqYqtyLYqPYq0qdS)Ni)r:recvrDInterruptedErrorBlockingIOErrorrBrrrrAps   z%BaseSelectorEventLoop._read_from_selfcCsL|j}|durdSz|dWn&tyF|jrBtjdddYn0dS)Nz3Fail to write a null byte into the self-pipe socketTexc_info)r<sendOSError_debugr r)r!Zcsockrrr_write_to_self|s z$BaseSelectorEventLoop._write_to_selfdc Cs"|||j||||||dSr&)r@r;_accept_connection)r!protocol_factoryr(r/r%backlogr.rrr_start_servings z$BaseSelectorEventLoop._start_servingc Cst|D]}z0|\}} |jr0td|| ||dWntttfyXYdSt y} zl| j t j t j t j t jfvr|d| t|d|||tj|j||||||nWYd} ~ qd} ~ 00d| i} |||| |||} || qdS)Nz#%r got a new connection from %r: %rFz&socket.accept() out of system resource)message exceptionr>peername)rangeacceptrMr rr?rGrFConnectionAbortedErrorrLerrnoZEMFILEZENFILEZENOBUFSZENOMEMcall_exception_handlerr TransportSocketr9r;Z call_laterrZACCEPT_RETRY_DELAYrS_accept_connection2Z create_task) r!rQr(r/r%rRr._connaddrexcr$rXrrrrPs@    z(BaseSelectorEventLoop._accept_connectionc sd}d}zr|}|} |r8|j|||| d|||d}n|j||| ||d}z| IdHWntyv|Yn0WntttfyYn^ty} zF|jrd| d} |dur|| d<|dur|| d<|| WYd} ~ n d} ~ 00dS)NT)r*r,r$r%r.)r*r$r%z3Error on transport creation for incoming connection)rTrUr) transport) create_futurer0r+ BaseExceptionr7 SystemExitKeyboardInterruptrMr[) r!rQr_r$r/r%r.r)rbr*racontextrrrr]s@  z)BaseSelectorEventLoop._accept_connection2c Cs|}t|tsHzt|}Wn(tttfyFtd|dYn0z|j|}WntyhYn0|st d|d|dS)NzInvalid file object: zFile descriptor z is used by transport ) isinstanceintr;AttributeError TypeError ValueErrorr r is_closingr4)r!rr;rbrrr_ensure_fd_no_transports   z-BaseSelectorEventLoop._ensure_fd_no_transportc Gs|t|||d}z|j|}Wn(tyP|j|tj|dfYn>0|j|j }\}}|j ||tjB||f|dur| |Sr&) _check_closedrHandlerrrregisterr EVENT_READrCmodifycancel r!rcallbackargshandlermaskreaderwriterrrrr@s   z!BaseSelectorEventLoop._add_readercCs|r dSz|j|}Wnty0YdS0|j|j}\}}|tjM}|sb|j|n|j ||d|f|dur| dSdSdS)NFT) r5rrrrrCrrr unregisterrsrtr!rrryrzr{rrrr9 s  z$BaseSelectorEventLoop._remove_readerc Gs|t|||d}z|j|}Wn(tyP|j|tjd|fYn>0|j|j }\}}|j ||tjB||f|dur| |Sr&) rorrprrrrqr EVENT_WRITErCrsrtrurrr _add_writer!s   z!BaseSelectorEventLoop._add_writercCs|r dSz|j|}Wnty0YdS0|j|j}\}}|tjM}|sb|j|n|j |||df|dur| dSdSdS)Remove a writer callback.FNT) r5rrrrrCrr~r|rsrtr}rrr_remove_writer1s  z$BaseSelectorEventLoop._remove_writercGs"|||j||g|RdS)zAdd a reader callback.N)rnr@r!rrvrwrrr add_readerHs z BaseSelectorEventLoop.add_readercCs||||S)zRemove a reader callback.)rnr9r!rrrr remove_readerMs z#BaseSelectorEventLoop.remove_readercGs"|||j||g|RdS)zAdd a writer callback..N)rnrrrrr add_writerRs z BaseSelectorEventLoop.add_writercCs||||S)r)rnrrrrr remove_writerWs z#BaseSelectorEventLoop.remove_writerc st||jr$|dkr$tdz ||WSttfyFYn0|}| }| || ||j |||}| tj|j||d|IdHS)zReceive data from the socket. The return value is a bytes object representing the data received. The maximum amount of data to be received at once is specified by nbytes. rthe socket must be non-blockingrxN)r_check_ssl_socketrM gettimeoutrlrErGrFrcr;rnr@ _sock_recvadd_done_callback functoolspartial_sock_read_done)r!r(nfutrrxrrr sock_recv\s   zBaseSelectorEventLoop.sock_recvcCs|dus|s||dSr&) cancelledrr!rrrxrrrrrsz%BaseSelectorEventLoop._sock_read_donec Cs|r dSz||}WnZttfy2YdSttfyHYn8tyt}z||WYd}~nd}~00||dSr&) donerErGrFrerfrd set_exception set_result)r!rr(rrCrarrrrvs z BaseSelectorEventLoop._sock_recvc st||jr$|dkr$tdz ||WSttfyFYn0|}| }| || ||j |||}| tj|j||d|IdHS)zReceive data from the socket. The received data is written into *buf* (a writable buffer). The return value is the number of bytes written. rrrN)rrrMrrl recv_intorGrFrcr;rnr@_sock_recv_intorrrr)r!r(bufrrrxrrrsock_recv_intos   z$BaseSelectorEventLoop.sock_recv_intoc Cs|r dSz||}WnZttfy2YdSttfyHYn8tyt}z||WYd}~nd}~00||dSr&) rrrGrFrerfrdrr)r!rr(rnbytesrarrrrs z%BaseSelectorEventLoop._sock_recv_intoc st||jr$|dkr$tdz||}WnttfyLd}Yn0|t|kr^dS| }| }| || ||j ||t||g}|tj|j||d|IdHS)aSend data to the socket. The socket must be connected to a remote socket. This method continues to send data from data until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully processed by the receiving end of the connection. rrNr)rrrMrrlrKrGrFlenrcr;rnr _sock_sendall memoryviewrrr_sock_write_done)r!r(rCrrrrxrrr sock_sendalls&     z"BaseSelectorEventLoop.sock_sendallc Cs|r dS|d}z|||d}Wn\ttfyBYdSttfyXYn0ty}z||WYd}~dSd}~00||7}|t|kr| dn||d<dS)Nr) rrKrGrFrerfrdrrr)r!rr(Zviewposstartrrarrrrs    z#BaseSelectorEventLoop._sock_sendallcst||jr$|dkr$td|jtjksBtjrt|jtj krt|j ||j|j |j |dIdH}|d\}}}}}| }|||||IdHS)zTConnect to a remote socket at address. This method is a coroutine. rr)familytypeprotoloopN)rrrMrrlrr>ZAF_INETZ _HAS_IPv6ZAF_INET6Z_ensure_resolvedrrrc _sock_connect)r!r(r2Zresolvedr^rrrr sock_connects     z"BaseSelectorEventLoop.sock_connectc Cs|}z||Wnttfyb|||||j|||}|tj |j ||dYnNt t fyxYn8t y}z||WYd}~nd}~00|ddS)Nr)r;ZconnectrGrFrnr_sock_connect_cbrrrrrerfrdrr)r!rr(r2rrxrarrrrs    z#BaseSelectorEventLoop._sock_connectcCs|dus|s||dSr&)rrrrrrr sz&BaseSelectorEventLoop._sock_write_donec Cs|r dSz,|tjtj}|dkr6t|d|WnXttfyNYnNtt fydYn8t y}z| |WYd}~nd}~00| ddS)NrzConnect call failed ) rZ getsockoptr>Z SOL_SOCKETZSO_ERRORrLrGrFrerfrdrr)r!rr(r2errrarrrrs z&BaseSelectorEventLoop._sock_connect_cbcsBt||jr$|dkr$td|}||||IdHS)aWAccept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. rrN)rrrMrrlrc _sock_accept)r!r(rrrr sock_accept"s   z!BaseSelectorEventLoop.sock_acceptc Cs|}z|\}}|dWnttfyl|||||j||}|t j |j ||dYnRt t fyYn<ty}z||WYd}~nd}~00|||fdS)NFr)r;rXr?rGrFrnr@rrrrrrerfrdrr)r!rr(rr_r2rxrarrrr1s   z"BaseSelectorEventLoop._sock_acceptc s|j|j=|}||IdHzL|j|j|||ddIdHW||rZ|||j|j<S||j|j<n"||r|||j|j<0dS)NF)Zfallback) r _sock_fd is_reading pause_reading_make_empty_waiterZ sock_sendfile_sock_reset_empty_waiterresume_reading)r!Ztranspfileoffsetcountrrrr_sendfile_nativeBs*   z&BaseSelectorEventLoop._sendfile_nativecCs|D]v\}}|j|j}\}}|tj@rL|durL|jrB||n |||tj@r|dur|jrp||q||qdSr&) fileobjrCrrrZ _cancelledr9Z _add_callbackr~r)r!Z event_listrryrrzr{rrr_process_eventsPs    z%BaseSelectorEventLoop._process_eventscCs|||dSr&)r9r;r7)r!r(rrr _stop_serving^sz#BaseSelectorEventLoop._stop_serving)N)N)N)NNN)N)N)-r __module__ __qualname____doc__rr+rZSSL_HANDSHAKE_TIMEOUTr0r3r7r6rrDrArNrSrPr]rnr@r9rrrrrrrrrrrrrrrrrrrrrr __classcell__rrr"rr+sj        . )  rcseZdZdZeZdZdfdd ZddZddZ d d Z d d Z d dZ ddZ ejfddZdddZddZddZddZddZZS) _SelectorTransportiNcst||t||jd<z||jd<WntyLd|jd<Yn0d|jvrz||jd<Wntj yd|jd<Yn0||_ | |_ d|_ ||||_||_d|_d|_|jdur|j||j|j <dS)Nr>ZsocknamerVFr)rrr r\_extraZ getsocknamerLZ getpeernamer>errorrr;r_protocol_connected set_protocol_server_buffer_factory_buffer _conn_lost_closingZ_attachr )r!rr(r)r$r%r"rrros,       z_SelectorTransport.__init__cCs|jjg}|jdur |dn|jr0|d|d|j|jdur|jst|jj |jt j }|rz|dn |dt|jj |jt j }|rd}nd}| }|d|d |d d d |S) Nclosedclosingzfd=z read=pollingz read=idlepollingZidlezwrite=z<{}> )rrrappendrr_loopr5rrrrrr~get_write_buffer_sizeformatjoin)r!inforstatebufsizerrr__repr__s.      z_SelectorTransport.__repr__cCs|ddSr&) _force_closer8rrrabortsz_SelectorTransport.abortcCs||_d|_dSNT) _protocolrr!r)rrrrsz_SelectorTransport.set_protocolcCs|jSr&)rr8rrr get_protocolsz_SelectorTransport.get_protocolcCs|jSr&)rr8rrrrmsz_SelectorTransport.is_closingcCsT|jr dSd|_|j|j|jsP|jd7_|j|j|j|jddSNTr) rrr9rrrr call_soon_call_connection_lostr8rrrr7sz_SelectorTransport.closecCs,|jdur(|d|t|d|jdS)Nzunclosed transport )source)rResourceWarningr7)r!Z_warnrrr__del__s z_SelectorTransport.__del__Fatal error on transportcCsNt|tr(|jr@tjd||ddn|j||||jd||dS)Nz%r: %sTrI)rTrUrbr)) rhrLr get_debugr rr[rr)r!rarTrrr _fatal_errors  z_SelectorTransport._fatal_errorcCsd|jr dS|jr(|j|j|j|jsBd|_|j|j|jd7_|j|j |dSr) rrclearrrrrr9rrr!rarrrrs z_SelectorTransport._force_closecCszN|jr|j|W|jd|_d|_d|_|j}|dur|d|_n:|jd|_d|_d|_|j}|dur|d|_0dSr&)rrZconnection_lostrr7rrZ_detach)r!rar%rrrrs&  z(_SelectorTransport._call_connection_lostcCs t|jSr&)rrr8rrrrsz(_SelectorTransport.get_write_buffer_sizecGs$|jr dS|jj||g|RdSr&)rrr@rrrrr@sz_SelectorTransport._add_reader)NN)r)rrrmax_size bytearrayrrrrrrrrmr7warningswarnrrrrrr@rrrr"rrcs    rcseZdZdZejjZd#fdd ZfddZ ddZ d d Z d d Z d dZ ddZddZddZddZddZddZddZfddZdd Zd!d"ZZS)$r'TNcs~d|_t|||||d|_d|_d|_t|j|j |j j ||j |j |j|j|durz|j tj|ddSr )_read_ready_cbrr_eof_paused _empty_waiterrZ _set_nodelayrrrrconnection_mader@r _read_readyr_set_result_unless_cancelled)r!rr(r)r*r$r%r"rrrs   z!_SelectorSocketTransport.__init__cs.t|tjr|j|_n|j|_t|dSr&)rhrZBufferedProtocol_read_ready__get_bufferr_read_ready__data_receivedrrrr"rrrs  z%_SelectorSocketTransport.set_protocolcCs|j o|j Sr&)rrr8rrrrsz#_SelectorSocketTransport.is_readingcCs>|js |jrdSd|_|j|j|jr:td|dS)NTz%r pauses reading)rrrr9rrr rr8rrrrs   z&_SelectorSocketTransport.pause_readingcCs@|js |jsdSd|_||j|j|jr      z_SelectorSocketTransport.writec Cs&|jsJd|jrdSz|j|j}Wnttfy@YnttfyVYnty}zF|j |j |j | |d|jdur|j|WYd}~nxd}~00|r|jd|=||js"|j |j |jdur|jd|jr |dn|jr"|jtjdS)NzData should not be emptyr)rrrrKrGrFrerfrdrrrrrrr_maybe_resume_protocolrrrrshutdownr>SHUT_WR)r!rrarrrrs4   "    z%_SelectorSocketTransport._write_readycCs.|js |jrdSd|_|js*|jtjdSr)rrrrrr>rr8rrr write_eofs  z"_SelectorSocketTransport.write_eofcCsdSrrr8rrr can_write_eofsz&_SelectorSocketTransport.can_write_eofcs*t||jdur&|jtddS)NzConnection is closed by peer)rrrrConnectionErrorrr"rrrs   z._SelectorSocketTransport._call_connection_lostcCs6|jdurtd|j|_|js0|jd|jS)NzEmpty waiter is already set)rr4rrcrrr8rrrrs    z+_SelectorSocketTransport._make_empty_waitercCs d|_dSr&)rr8rrrrsz,_SelectorSocketTransport._reset_empty_waiter)NNN)rrrZ_start_tls_compatiblerZ _SendfileModeZ TRY_NATIVEZ_sendfile_compatiblerrrrrrrrrrrrrrrrrrrr"rr's( %' r'csFeZdZejZd fdd ZddZddZd dd Z d d Z Z S)r1Ncs^t||||||_|j|jj||j|j|j|j |durZ|jt j |ddSr&) rr_addressrrrrr@rrrr)r!rr(r)r2r*r$r"rrrs  z#_SelectorDatagramTransport.__init__cCstdd|jDS)Ncss|]\}}t|VqdSr&)r).0rCr^rrr zC_SelectorDatagramTransport.get_write_buffer_size..)sumrr8rrrrsz0_SelectorDatagramTransport.get_write_buffer_sizec Cs|jr dSz|j|j\}}Wnttfy6Yntyd}z|j|WYd}~n\d}~0t t fyzYn>t y}z| |dWYd}~nd}~00|j ||dS)Nz&Fatal read error on datagram transport)rrZrecvfromrrGrFrLrerror_receivedrerfrdrZdatagram_receivedr!rCr`rarrrrs "z&_SelectorDatagramTransport._read_readyc Cst|tttfs$tdt|j|s,dS|jrV|d|jfvrPtd|j|j}|j r|jr|j t j krxt d|j d7_ dS|jsdz,|jdr|j|n|j||WdSttfy|j|j|jYn~ty}z|j|WYd}~dSd}~0ttfy0Yn4tyb}z||dWYd}~dSd}~00|j t||f|!dS)Nrz!Invalid address: must be None or rrrV'Fatal write error on datagram transport)"rhrrrrkrrr rlrrrr rrrrrKsendtorGrFrrr _sendto_readyrLrrrerfrdrrrrrrrrsJ      z!_SelectorDatagramTransport.sendtoc Cs|jr|j\}}z*|jdr.|j|n|j||Wqttfyh|j||fYqYqt y}z|j |WYd}~dSd}~0t t fyYqty}z||dWYd}~dSd}~00q||js|j|j|jr|ddS)NrVr)rpopleftrrrKrrGrF appendleftrLrrrerfrdrrrrrrrrrrrr0s0  z(_SelectorDatagramTransport._sendto_ready)NNN)N) rrr collectionsdequerrrrrrrrrr"rr1s  +r1)r__all__rrZrrr>rrZssl ImportErrorrrrrrr r r logr rZ BaseEventLooprZ_FlowControlMixinZ Transportrr'r1rrrrsD             <o