g0$ UddlZddlZddlZddlZddlZddlZddlZddlZddlZddl m Z ddl m Z m Z m Z ddlmZddlZddlmZdZdZdZejGd d ZGd d ejjZd efdZde de d edefdZejGddZia e!edzefe"d<efdedzd ededej#fdZ$defdZ%dS)N)datetime)EventProcessQueue) SentryHandler)init_lve_utils_sentry_client i,z4%(asctime)s - %(name)s - %(levelname)s - %(message)sceZdZUdZeed<edzed<edzed<edzed<edzed<edzed<ed e j fd Z d Z dS) ProcessDescriptiona Process description object pid: int - process id ppid: int | None - parent process id user: str | None - process owner status: str | None - process status start_time: datetime | None - process start time cmdline: str | None - process command line pidNppiduserstatus start_timecmdlineprocessc |}n#t$rd}YnwxYw d|}n#t$rd}YnwxYw t j|}n#t$rd}YnwxYw |}n#t$rd}YnwxYw |}n#t$rd}YnwxYw||j |||||S)z>DD   DDD  hhw0011GG   GGG  !/0C0C0E0EFFJJ   JJJ  ##%%DD   DDD  ^^%%FF   FFF s7;dFJHHHsS &&'A A! A!%&B BBB44 CCC C+*C+c fd|jd|jd|jd|jd|jd|j S)NzPID: z, PPID: z, User: z , Status: z Start time: z Command: )r r rrrrselfs rto_strzProcessDescription.to_strMsrEtxEEEEDIEEQUQ\EE?EE6:lEE E) __name__ __module__ __qualname____doc__int__annotations__strr classmethodpsutilrrr r!rr r s   HHH * * $J4 4ZI&.III[I0EEEEEr!r c^eZdZeeddedeffdZdZfdZdZ dZ d Z d Z d Z xZS) QueueListenerWithHistory)timeoutrecords_history_sizer.r/ctj|g|Rdditjg||_||_d|_dS)aQueue listener with log records history :param multiprocessing.Queue queue: log records queue :param int timeout: log records flow timeout, defaults to DEFAULT_LOG_EVENTS_TIMEOUT In case timeput is reached, last log records and process ancestors are logged :param int records_history_size: the max number of records to log in case of timeout respect_handler_levelT)iterablemaxlenFN)super__init__ collectionsdeque_records_history_queue_timeout_timout_registred)rqueuer.r/handlers __class__s rr5z!QueueListenerWithHistory.__init__Ssf FFFFFFFLWL]'M M M #  !&r!c |j|j}|j|n*#tj$r|YnwxYwf|S)zDequeue log eventTr.)r;getr9r8appendEmpty_handle_queue_timeout)r_blockrecords rdequeuez QueueListenerWithHistory.dequeueis~ - - >>+226:::; - - -**,,,,, -  - s:>#A$#A$c|t|dS)zMain loop to process log eventsN)_handle_monitor_startedr4_monitor_handle_monitor_finished)rr=s rrIz!QueueListenerWithHistory._monitortsC $$&&&  %%'''''r!c6tj|_dS)z+Start monitoring log events processing timeN)time _start_tsrs rrHz0QueueListenerWithHistory._handle_monitor_startedzsr!c|jdurc|tjdt j|jz t j}||dSdS)z0Log that monitor finished processing log recordsTz/Processing log queue took %.4f seconds. Pid: %sN) r:_create_recordloggingERRORrLrMosgetpidhandle)rrs rrJz1QueueListenerWithHistory._handle_monitor_finished~sc  !T ) )## A dn, A KKNNNNN * )r!cvg} tj}t|g}|x}dkr[tj|}|t||x}dk[|S#t $r|cYSwxYw)z%Collect process ancestors up to pid 1r)r*rr rr rAr)r ancestorsrr s r_get_ancestorsz'QueueListenerWithHistory._get_ancestorss.0  n&&G+77@@AI"<<>>)4a// ...  !3!?!?!H!HIII#<<>>)4a//        sB$B)) B87B8c Btjt|dd||dS)zCreate log recordN)namelevelpathnamelinenomsgargsexc_info)rP LogRecordr")rr]r`ras rrOz'QueueListenerWithHistory._create_records1     r!cN|jdurdd|dddD}dd|jD}d}||f}|jt j|g|R}||d|_dSdS) z Handle log queue timeout event. Print last events (incl. omitted because debug disabled) and process ancestors to log. F c3>K|]}|VdS)N)r ).0 proc_descrs r zAQueueListenerWithHistory._handle_queue_timeout..s?''(2 !!##''''''r!Nc3nK|]0}d|jdtj|jd|jV1dS)[z] - z: N) levelnamerrcreatedmessage)rgrEs rrizAQueueListenerWithHistory._handle_queue_timeout..sc$$eF$dd(*@*P*PddTZTbdd$$$$$$r!z8Log queue timeout. Last events: %s Process ancestors: %sT)r:rrXr8rOrPrQrT)rproc_ancestors last_events error_msg error_argsrUs rrCz.QueueListenerWithHistory._handle_queue_timeouts  !U * *!YY''6:6I6I6K6KDDbD6Q'''N))$$"9$$$KVI%~6J##GM9JzJJJA KKNNN%)D " " " + *r!)r"r#r$DEFAULT_LOG_EVENTS_TIMEOUTDEFAULT_RECORDS_HISTORY_SIZEr&r5rFrIrHrJrXrOrC __classcell__)r=s@rr-r-Rs 6(D ''' ' #& '''''',   ((((( %%%          *******r!r- file_namectj|dr tjn tjS)z*Get log level based on debug flag presencez .debug.flag)pathlibPathexistsrPDEBUGINFO)rws r get_log_levelr~s6#LI)B)B)BCCJJLL ^7==RYR^^r!q stop_eventr.cttdtj}tj|}|tjt|t|t||||}| | | dS)z Main function to process log events in subprocess :param Queue q: log records queue :param Event stop_event: stop event :param str file_name: log file name lvectl)r]r?N)rrrPrQ FileHandler setFormatter FormatterDEFAULT_LOG_FORMATsetLevelr~r-startwaitstop)rrrwr.sentry_handlerfh q_handlers rrun_subpprocess_loggerrs##?#I#IQXQ^___N  Y ' 'BOOG%&899:::KK i(()))(BPPPI OOOO NNr!c$eZdZUeed<eed<dS)SubprocessLoggerlogger_processrN)r"r#r$rr'rr+r!rrrs*r!rsubprocess_loggers logger_namereturncht|}|tj|Stj|}|tjt }tj|}|tj| |t}tt||||f}| tj|jt#||t|<|S)a Get asynchronous logger running in subprocess Log records asynchronously in subprocess (>=info by default, >= debug when debug flag is present). Log records from debug and higher in subprocess in case of timeout. WARNING: multiprocessing.Queue is used to pass log records. It works asynchornously (passing to subprocess in handled by separate thread). This may lead to log records not being passed to the subprocess in case main process got stuck e.g. in kmodlve kernel space. :param str | None logger_name: logger name :param str file_name: log file :param int timeout: log events processing timeout N)targetra)rr@rP getLoggerrr|rr< QueueHandler addHandlerrrrratexitregistersetr) rrwr.subprocess_loggerloggersubprocess_logger_queue queue_handlerrrs rinit_subprocess_loggerrs(+..{;;$ ---  { + +F OOGM"""%*WW$112IJJM7=))) m$$$J$:#:J SZ"[]]]N OJN###&6~z&R&R{# Mr!r\ct|}|dS|j|jt|=dS)z Stop subprocess logger N)rr@rrrr)r\rs rstop_subprocess_loggerrs[ +..t44  $$&&&$))+++4   r!)&rr6 dataclassesrPlogging.handlersrRryr;rLrmultiprocessingrrrraven.handlers.loggingrr*lve_utils.sentryrrurtr dataclassr r< QueueListenerr-r(r~r&rrrdictr'Loggerrrr+r!rrsg   1111111111000000 999999 " K .E.E.E.E.E.E.E.Eba*a*a*a*a*w/=a*a*a*H_S____ e3QT(  :<Dt%556;;; .))t))) ^ ))))X ! ! ! ! ! ! !r!