import gc import sys import time import threading import unittest from abc import ABCMeta from abc import abstractmethod import greenlet from greenlet import greenlet as RawGreenlet from . import TestCase from . import RUNNING_ON_MANYLINUX from . import PY313 from .leakcheck import fails_leakcheck # We manually manage locks in many tests # pylint:disable=consider-using-with # pylint:disable=too-many-public-methods # This module is quite large. # TODO: Refactor into separate test files. For example, # put all the regression tests that used to produce # crashes in test_greenlet_no_crash; put tests that DO deliberately crash # the interpreter into test_greenlet_crash. # pylint:disable=too-many-lines class SomeError(Exception): pass def fmain(seen): try: greenlet.getcurrent().parent.switch() except: seen.append(sys.exc_info()[0]) raise raise SomeError def send_exception(g, exc): # note: send_exception(g, exc) can be now done with g.throw(exc). # the purpose of this test is to explicitly check the propagation rules. def crasher(exc): raise exc g1 = RawGreenlet(crasher, parent=g) g1.switch(exc) class TestGreenlet(TestCase): def _do_simple_test(self): lst = [] def f(): lst.append(1) greenlet.getcurrent().parent.switch() lst.append(3) g = RawGreenlet(f) lst.append(0) g.switch() lst.append(2) g.switch() lst.append(4) self.assertEqual(lst, list(range(5))) def test_simple(self): self._do_simple_test() def test_switch_no_run_raises_AttributeError(self): g = RawGreenlet() with self.assertRaises(AttributeError) as exc: g.switch() self.assertIn("run", str(exc.exception)) def test_throw_no_run_raises_AttributeError(self): g = RawGreenlet() with self.assertRaises(AttributeError) as exc: g.throw(SomeError) self.assertIn("run", str(exc.exception)) def test_parent_equals_None(self): g = RawGreenlet(parent=None) self.assertIsNotNone(g) self.assertIs(g.parent, greenlet.getcurrent()) def test_run_equals_None(self): g = RawGreenlet(run=None) self.assertIsNotNone(g) self.assertIsNone(g.run) def test_two_children(self): lst = [] def f(): lst.append(1) greenlet.getcurrent().parent.switch() lst.extend([1, 1]) g = RawGreenlet(f) h = RawGreenlet(f) g.switch() self.assertEqual(len(lst), 1) h.switch() self.assertEqual(len(lst), 2) h.switch() self.assertEqual(len(lst), 4) self.assertEqual(h.dead, True) g.switch() self.assertEqual(len(lst), 6) self.assertEqual(g.dead, True) def test_two_recursive_children(self): lst = [] def f(): lst.append('b') greenlet.getcurrent().parent.switch() def g(): lst.append('a') g = RawGreenlet(f) g.switch() lst.append('c') g = RawGreenlet(g) self.assertEqual(sys.getrefcount(g), 2) g.switch() self.assertEqual(lst, ['a', 'b', 'c']) # Just the one in this frame, plus the one on the stack we pass to the function self.assertEqual(sys.getrefcount(g), 2) def test_threads(self): success = [] def f(): self._do_simple_test() success.append(True) ths = [threading.Thread(target=f) for i in range(10)] for th in ths: th.start() for th in ths: th.join(10) self.assertEqual(len(success), len(ths)) def test_exception(self): seen = [] g1 = RawGreenlet(fmain) g2 = RawGreenlet(fmain) g1.switch(seen) g2.switch(seen) g2.parent = g1 self.assertEqual(seen, []) #with self.assertRaises(SomeError): # p("***Switching back") # g2.switch() # Creating this as a bound method can reveal bugs that # are hidden on newer versions of Python that avoid creating # bound methods for direct expressions; IOW, don't use the `with` # form! self.assertRaises(SomeError, g2.switch) self.assertEqual(seen, [SomeError]) value = g2.switch() self.assertEqual(value, ()) self.assertEqual(seen, [SomeError]) value = g2.switch(25) self.assertEqual(value, 25) self.assertEqual(seen, [SomeError]) def test_send_exception(self): seen = [] g1 = RawGreenlet(fmain) g1.switch(seen) self.assertRaises(KeyError, send_exception, g1, KeyError) self.assertEqual(seen, [KeyError]) def test_dealloc(self): seen = [] g1 = RawGreenlet(fmain) g2 = RawGreenlet(fmain) g1.switch(seen) g2.switch(seen) self.assertEqual(seen, []) del g1 gc.collect() self.assertEqual(seen, [greenlet.GreenletExit]) del g2 gc.collect() self.assertEqual(seen, [greenlet.GreenletExit, greenlet.GreenletExit]) def test_dealloc_catches_GreenletExit_throws_other(self): def run(): try: greenlet.getcurrent().parent.switch() except greenlet.GreenletExit: raise SomeError from None g = RawGreenlet(run) g.switch() # Destroying the only reference to the greenlet causes it # to get GreenletExit; when it in turn raises, even though we're the parent # we don't get the exception, it just gets printed. # When we run on 3.8 only, we can use sys.unraisablehook oldstderr = sys.stderr from io import StringIO stderr = sys.stderr = StringIO() try: del g finally: sys.stderr = oldstderr v = stderr.getvalue() self.assertIn("Exception", v) self.assertIn('ignored', v) self.assertIn("SomeError", v) @unittest.skipIf( PY313 and RUNNING_ON_MANYLINUX, "Sometimes flaky (getting one GreenletExit in the second list)" # Probably due to funky timing interactions? # TODO: FIXME Make that work. ) def test_dealloc_other_thread(self): seen = [] someref = [] bg_glet_created_running_and_no_longer_ref_in_bg = threading.Event() fg_ref_released = threading.Event() bg_should_be_clear = threading.Event() ok_to_exit_bg_thread = threading.Event() def f(): g1 = RawGreenlet(fmain) g1.switch(seen) someref.append(g1) del g1 gc.collect() bg_glet_created_running_and_no_longer_ref_in_bg.set() fg_ref_released.wait(3) RawGreenlet() # trigger release bg_should_be_clear.set() ok_to_exit_bg_thread.wait(3) RawGreenlet() # One more time t = threading.Thread(target=f) t.start() bg_glet_created_running_and_no_longer_ref_in_bg.wait(10) self.assertEqual(seen, []) self.assertEqual(len(someref), 1) del someref[:] gc.collect() # g1 is not released immediately because it's from another thread self.assertEqual(seen, []) fg_ref_released.set() bg_should_be_clear.wait(3) try: self.assertEqual(seen, [greenlet.GreenletExit]) finally: ok_to_exit_bg_thread.set() t.join(10) del seen[:] del someref[:] def test_frame(self): def f1(): f = sys._getframe(0) # pylint:disable=protected-access self.assertEqual(f.f_back, None) greenlet.getcurrent().parent.switch(f) return "meaning of life" g = RawGreenlet(f1) frame = g.switch() self.assertTrue(frame is g.gr_frame) self.assertTrue(g) from_g = g.switch() self.assertFalse(g) self.assertEqual(from_g, 'meaning of life') self.assertEqual(g.gr_frame, None) def test_thread_bug(self): def runner(x): g = RawGreenlet(lambda: time.sleep(x)) g.switch() t1 = threading.Thread(target=runner, args=(0.2,)) t2 = threading.Thread(target=runner, args=(0.3,)) t1.start() t2.start() t1.join(10) t2.join(10) def test_switch_kwargs(self): def run(a, b): self.assertEqual(a, 4) self.assertEqual(b, 2) return 42 x = RawGreenlet(run).switch(a=4, b=2) self.assertEqual(x, 42) def test_switch_kwargs_to_parent(self): def run(x): greenlet.getcurrent().parent.switch(x=x) greenlet.getcurrent().parent.switch(2, x=3) return x, x ** 2 g = RawGreenlet(run) self.assertEqual({'x': 3}, g.switch(3)) self.assertEqual(((2,), {'x': 3}), g.switch()) self.assertEqual((3, 9), g.switch()) def test_switch_to_another_thread(self): data = {} created_event = threading.Event() done_event = threading.Event() def run(): data['g'] = RawGreenlet(lambda: None) created_event.set() done_event.wait(10) thread = threading.Thread(target=run) thread.start() created_event.wait(10) with self.assertRaises(greenlet.error): data['g'].switch() done_event.set() thread.join(10) # XXX: Should handle this automatically data.clear() def test_exc_state(self): def f(): try: raise ValueError('fun') except: # pylint:disable=bare-except exc_info = sys.exc_info() RawGreenlet(h).switch() self.assertEqual(exc_info, sys.exc_info()) def h(): self.assertEqual(sys.exc_info(), (None, None, None)) RawGreenlet(f).switch() def test_instance_dict(self): def f(): greenlet.getcurrent().test = 42 def deldict(g): del g.__dict__ def setdict(g, value): g.__dict__ = value g = RawGreenlet(f) self.assertEqual(g.__dict__, {}) g.switch() self.assertEqual(g.test, 42) self.assertEqual(g.__dict__, {'test': 42}) g.__dict__ = g.__dict__ self.assertEqual(g.__dict__, {'test': 42}) self.assertRaises(TypeError, deldict, g) self.assertRaises(TypeError, setdict, g, 42) def test_running_greenlet_has_no_run(self): has_run = [] def func(): has_run.append( hasattr(greenlet.getcurrent(), 'run') ) g = RawGreenlet(func) g.switch() self.assertEqual(has_run, [False]) def test_deepcopy(self): import copy self.assertRaises(TypeError, copy.copy, RawGreenlet()) self.assertRaises(TypeError, copy.deepcopy, RawGreenlet()) def test_parent_restored_on_kill(self): hub = RawGreenlet(lambda: None) main = greenlet.getcurrent() result = [] def worker(): try: # Wait to be killed by going back to the test. main.switch() except greenlet.GreenletExit: # Resurrect and switch to parent result.append(greenlet.getcurrent().parent) result.append(greenlet.getcurrent()) hub.switch() g = RawGreenlet(worker, parent=hub) g.switch() # delete the only reference, thereby raising GreenletExit del g self.assertTrue(result) self.assertIs(result[0], main) self.assertIs(result[1].parent, hub) # Delete them, thereby breaking the cycle between the greenlet # and the frame, which otherwise would never be collectable # XXX: We should be able to automatically fix this. del result[:] hub = None main = None def test_parent_return_failure(self): # No run causes AttributeError on switch g1 = RawGreenlet() # Greenlet that implicitly switches to parent g2 = RawGreenlet(lambda: None, parent=g1) # AttributeError should propagate to us, no fatal errors with self.assertRaises(AttributeError): g2.switch() def test_throw_exception_not_lost(self): class mygreenlet(RawGreenlet): def __getattribute__(self, name): try: raise Exception # pylint:disable=broad-exception-raised except: # pylint:disable=bare-except pass return RawGreenlet.__getattribute__(self, name) g = mygreenlet(lambda: None) self.assertRaises(SomeError, g.throw, SomeError()) @fails_leakcheck def _do_test_throw_to_dead_thread_doesnt_crash(self, wait_for_cleanup=False): result = [] def worker(): greenlet.getcurrent().parent.switch() def creator(): g = RawGreenlet(worker) g.switch() result.append(g) if wait_for_cleanup: # Let this greenlet eventually be cleaned up. g.switch() greenlet.getcurrent() t = threading.Thread(target=creator) t.start() t.join(10) del t # But, depending on the operating system, the thread # deallocator may not actually have run yet! So we can't be # sure about the error message unless we wait. if wait_for_cleanup: self.wait_for_pending_cleanups() with self.assertRaises(greenlet.error) as exc: result[0].throw(SomeError) if not wait_for_cleanup: s = str(exc.exception) self.assertTrue( s == "cannot switch to a different thread (which happens to have exited)" or 'Cannot switch' in s ) else: self.assertEqual( str(exc.exception), "cannot switch to a different thread (which happens to have exited)", ) if hasattr(result[0].gr_frame, 'clear'): # The frame is actually executing (it thinks), we can't clear it. with self.assertRaises(RuntimeError): result[0].gr_frame.clear() # Unfortunately, this doesn't actually clear the references, they're in the # fast local array. if not wait_for_cleanup: # f_locals has no clear method in Python 3.13 if hasattr(result[0].gr_frame.f_locals, 'clear'): result[0].gr_frame.f_locals.clear() else: self.assertIsNone(result[0].gr_frame) del creator worker = None del result[:] # XXX: we ought to be able to automatically fix this. # See issue 252 self.expect_greenlet_leak = True # direct us not to wait for it to go away @fails_leakcheck def test_throw_to_dead_thread_doesnt_crash(self): self._do_test_throw_to_dead_thread_doesnt_crash() def test_throw_to_dead_thread_doesnt_crash_wait(self): self._do_test_throw_to_dead_thread_doesnt_crash(True) @fails_leakcheck def test_recursive_startup(self): class convoluted(RawGreenlet): def __init__(self): RawGreenlet.__init__(self) self.count = 0 def __getattribute__(self, name): if name == 'run' and self.count == 0: self.count = 1 self.switch(43) return RawGreenlet.__getattribute__(self, name) def run(self, value): while True: self.parent.switch(value) g = convoluted() self.assertEqual(g.switch(42), 43) # Exits the running greenlet, otherwise it leaks # XXX: We should be able to automatically fix this #g.throw(greenlet.GreenletExit) #del g self.expect_greenlet_leak = True def test_threaded_updatecurrent(self): # released when main thread should execute lock1 = threading.Lock() lock1.acquire() # released when another thread should execute lock2 = threading.Lock() lock2.acquire() class finalized(object): def __del__(self): # happens while in green_updatecurrent() in main greenlet # should be very careful not to accidentally call it again # at the same time we must make sure another thread executes lock2.release() lock1.acquire() # now ts_current belongs to another thread def deallocator(): greenlet.getcurrent().parent.switch() def fthread(): lock2.acquire() greenlet.getcurrent() del g[0] lock1.release() lock2.acquire() greenlet.getcurrent() lock1.release() main = greenlet.getcurrent() g = [RawGreenlet(deallocator)] g[0].bomb = finalized() g[0].switch() t = threading.Thread(target=fthread) t.start() # let another thread grab ts_current and deallocate g[0] lock2.release() lock1.acquire() # this is the corner stone # getcurrent() will notice that ts_current belongs to another thread # and start the update process, which would notice that g[0] should # be deallocated, and that will execute an object's finalizer. Now, # that object will let another thread run so it can grab ts_current # again, which would likely crash the interpreter if there's no # check for this case at the end of green_updatecurrent(). This test # passes if getcurrent() returns correct result, but it's likely # to randomly crash if it's not anyway. self.assertEqual(greenlet.getcurrent(), main) # wait for another thread to complete, just in case t.join(10) def test_dealloc_switch_args_not_lost(self): seen = [] def worker(): # wait for the value value = greenlet.getcurrent().parent.switch() # delete all references to ourself del worker[0] initiator.parent = greenlet.getcurrent().parent # switch to main with the value, but because # ts_current is the last reference to us we # return here immediately, where we resurrect ourself. try: greenlet.getcurrent().parent.switch(value) finally: seen.append(greenlet.getcurrent()) def initiator(): return 42 # implicitly falls thru to parent worker = [RawGreenlet(worker)] worker[0].switch() # prime worker initiator = RawGreenlet(initiator, worker[0]) value = initiator.switch() self.assertTrue(seen) self.assertEqual(value, 42) def test_tuple_subclass(self): # The point of this test is to see what happens when a custom # tuple subclass is used as an object passed directly to the C # function ``green_switch``; part of ``green_switch`` checks # the ``len()`` of the ``args`` tuple, and that can call back # into Python. Here, when it calls back into Python, we # recursively enter ``green_switch`` again. # This test is really only relevant on Python 2. The builtin # `apply` function directly passes the given args tuple object # to the underlying function, whereas the Python 3 version # unpacks and repacks into an actual tuple. This could still # happen using the C API on Python 3 though. We should write a # builtin version of apply() ourself. def _apply(func, a, k): func(*a, **k) class mytuple(tuple): def __len__(self): greenlet.getcurrent().switch() return tuple.__len__(self) args = mytuple() kwargs = dict(a=42) def switchapply(): _apply(greenlet.getcurrent().parent.switch, args, kwargs) g = RawGreenlet(switchapply) self.assertEqual(g.switch(), kwargs) def test_abstract_subclasses(self): AbstractSubclass = ABCMeta( 'AbstractSubclass', (RawGreenlet,), {'run': abstractmethod(lambda self: None)}) class BadSubclass(AbstractSubclass): pass class GoodSubclass(AbstractSubclass): def run(self): pass GoodSubclass() # should not raise self.assertRaises(TypeError, BadSubclass) def test_implicit_parent_with_threads(self): if not gc.isenabled(): return # cannot test with disabled gc N = gc.get_threshold()[0] if N < 50: return # cannot test with such a small N def attempt(): lock1 = threading.Lock() lock1.acquire() lock2 = threading.Lock() lock2.acquire() recycled = [False] def another_thread(): lock1.acquire() # wait for gc greenlet.getcurrent() # update ts_current lock2.release() # release gc t = threading.Thread(target=another_thread) t.start() class gc_callback(object): def __del__(self): lock1.release() lock2.acquire() recycled[0] = True class garbage(object): def __init__(self): self.cycle = self self.callback = gc_callback() l = [] x = range(N*2) current = greenlet.getcurrent() g = garbage() for _ in x: g = None # lose reference to garbage if recycled[0]: # gc callback called prematurely t.join(10) return False last = RawGreenlet() if recycled[0]: break # yes! gc called in green_new l.append(last) # increase allocation counter else: # gc callback not called when expected gc.collect() if recycled[0]: t.join(10) return False self.assertEqual(last.parent, current) for g in l: self.assertEqual(g.parent, current) return True for _ in range(5): if attempt(): break def test_issue_245_reference_counting_subclass_no_threads(self): # https://github.com/python-greenlet/greenlet/issues/245 # Before the fix, this crashed pretty reliably on # Python 3.10, at least on macOS; but much less reliably on other # interpreters (memory layout must have changed). # The threaded test crashed more reliably on more interpreters. from greenlet import getcurrent from greenlet import GreenletExit class Greenlet(RawGreenlet): pass initial_refs = sys.getrefcount(Greenlet) # This has to be an instance variable because # Python 2 raises a SyntaxError if we delete a local # variable referenced in an inner scope. self.glets = [] # pylint:disable=attribute-defined-outside-init def greenlet_main(): try: getcurrent().parent.switch() except GreenletExit: self.glets.append(getcurrent()) # Before the for _ in range(10): Greenlet(greenlet_main).switch() del self.glets self.assertEqual(sys.getrefcount(Greenlet), initial_refs) @unittest.skipIf( PY313 and RUNNING_ON_MANYLINUX, "The manylinux images appear to hang on this test on 3.13rc2" # Or perhaps I just got tired of waiting for the 450s timeout. # Still, it shouldn't take anywhere near that long. Does not reproduce in # Ubuntu images, on macOS or Windows. ) def test_issue_245_reference_counting_subclass_threads(self): # https://github.com/python-greenlet/greenlet/issues/245 from threading import Thread from threading import Event from greenlet import getcurrent class MyGreenlet(RawGreenlet): pass glets = [] ref_cleared = Event() def greenlet_main(): getcurrent().parent.switch() def thread_main(greenlet_running_event): mine = MyGreenlet(greenlet_main) glets.append(mine) # The greenlets being deleted must be active mine.switch() # Don't keep any reference to it in this thread del mine # Let main know we published our greenlet. greenlet_running_event.set() # Wait for main to let us know the references are # gone and the greenlet objects no longer reachable ref_cleared.wait(10) # The creating thread must call getcurrent() (or a few other # greenlet APIs) because that's when the thread-local list of dead # greenlets gets cleared. getcurrent() # We start with 3 references to the subclass: # - This module # - Its __mro__ # - The __subclassess__ attribute of greenlet # - (If we call gc.get_referents(), we find four entries, including # some other tuple ``(greenlet)`` that I'm not sure about but must be part # of the machinery.) # # On Python 3.10 it's often enough to just run 3 threads; on Python 2.7, # more threads are needed, and the results are still # non-deterministic. Presumably the memory layouts are different initial_refs = sys.getrefcount(MyGreenlet) thread_ready_events = [] for _ in range( initial_refs + 45 ): event = Event() thread = Thread(target=thread_main, args=(event,)) thread_ready_events.append(event) thread.start() for done_event in thread_ready_events: done_event.wait(10) del glets[:] ref_cleared.set() # Let any other thread run; it will crash the interpreter # if not fixed (or silently corrupt memory and we possibly crash # later). self.wait_for_pending_cleanups() self.assertEqual(sys.getrefcount(MyGreenlet), initial_refs) def test_falling_off_end_switches_to_unstarted_parent_raises_error(self): def no_args(): return 13 parent_never_started = RawGreenlet(no_args) def leaf(): return 42 child = RawGreenlet(leaf, parent_never_started) # Because the run function takes to arguments with self.assertRaises(TypeError): child.switch() def test_falling_off_end_switches_to_unstarted_parent_works(self): def one_arg(x): return (x, 24) parent_never_started = RawGreenlet(one_arg) def leaf(): return 42 child = RawGreenlet(leaf, parent_never_started) result = child.switch() self.assertEqual(result, (42, 24)) def test_switch_to_dead_greenlet_with_unstarted_perverse_parent(self): class Parent(RawGreenlet): def __getattribute__(self, name): if name == 'run': raise SomeError parent_never_started = Parent() seen = [] child = RawGreenlet(lambda: seen.append(42), parent_never_started) # Because we automatically start the parent when the child is # finished with self.assertRaises(SomeError): child.switch() self.assertEqual(seen, [42]) with self.assertRaises(SomeError): child.switch() self.assertEqual(seen, [42]) def test_switch_to_dead_greenlet_reparent(self): seen = [] parent_never_started = RawGreenlet(lambda: seen.append(24)) child = RawGreenlet(lambda: seen.append(42)) child.switch() self.assertEqual(seen, [42]) child.parent = parent_never_started # This actually is the same as switching to the parent. result = child.switch() self.assertIsNone(result) self.assertEqual(seen, [42, 24]) def test_can_access_f_back_of_suspended_greenlet(self): # This tests our frame rewriting to work around Python 3.12+ having # some interpreter frames on the C stack. It will crash in the absence # of that logic. main = greenlet.getcurrent() def outer(): inner() def inner(): main.switch(sys._getframe(0)) hub = RawGreenlet(outer) # start it hub.switch() # start another greenlet to make sure we aren't relying on # anything in `hub` still being on the C stack unrelated = RawGreenlet(lambda: None) unrelated.switch() # now it is suspended self.assertIsNotNone(hub.gr_frame) self.assertEqual(hub.gr_frame.f_code.co_name, "inner") self.assertIsNotNone(hub.gr_frame.f_back) self.assertEqual(hub.gr_frame.f_back.f_code.co_name, "outer") # The next line is what would crash self.assertIsNone(hub.gr_frame.f_back.f_back) def test_get_stack_with_nested_c_calls(self): from functools import partial from . import _test_extension_cpp def recurse(v): if v > 0: return v * _test_extension_cpp.test_call(partial(recurse, v - 1)) return greenlet.getcurrent().parent.switch() gr = RawGreenlet(recurse) gr.switch(5) frame = gr.gr_frame for i in range(5): self.assertEqual(frame.f_locals["v"], i) frame = frame.f_back self.assertEqual(frame.f_locals["v"], 5) self.assertIsNone(frame.f_back) self.assertEqual(gr.switch(10), 1200) # 1200 = 5! * 10 def test_frames_always_exposed(self): # On Python 3.12 this will crash if we don't set the # gr_frames_always_exposed attribute. More background: # https://github.com/python-greenlet/greenlet/issues/388 main = greenlet.getcurrent() def outer(): inner(sys._getframe(0)) def inner(frame): main.switch(frame) gr = RawGreenlet(outer) frame = gr.switch() # Do something else to clobber the part of the C stack used by `gr`, # so we can't skate by on "it just happened to still be there" unrelated = RawGreenlet(lambda: None) unrelated.switch() self.assertEqual(frame.f_code.co_name, "outer") # The next line crashes on 3.12 if we haven't exposed the frames. self.assertIsNone(frame.f_back) class TestGreenletSetParentErrors(TestCase): def test_threaded_reparent(self): data = {} created_event = threading.Event() done_event = threading.Event() def run(): data['g'] = RawGreenlet(lambda: None) created_event.set() done_event.wait(10) def blank(): greenlet.getcurrent().parent.switch() thread = threading.Thread(target=run) thread.start() created_event.wait(10) g = RawGreenlet(blank) g.switch() with self.assertRaises(ValueError) as exc: g.parent = data['g'] done_event.set() thread.join(10) self.assertEqual(str(exc.exception), "parent cannot be on a different thread") def test_unexpected_reparenting(self): another = [] def worker(): g = RawGreenlet(lambda: None) another.append(g) g.switch() t = threading.Thread(target=worker) t.start() t.join(10) # The first time we switch (running g_initialstub(), which is # when we look up the run attribute) we attempt to change the # parent to one from another thread (which also happens to be # dead). ``g_initialstub()`` should detect this and raise a # greenlet error. # # EXCEPT: With the fix for #252, this is actually detected # sooner, when setting the parent itself. Prior to that fix, # the main greenlet from the background thread kept a valid # value for ``run_info``, and appeared to be a valid parent # until we actually started the greenlet. But now that it's # cleared, this test is catching whether ``green_setparent`` # can detect the dead thread. # # Further refactoring once again changes this back to a greenlet.error # # We need to wait for the cleanup to happen, but we're # deliberately leaking a main greenlet here. self.wait_for_pending_cleanups(initial_main_greenlets=self.main_greenlets_before_test + 1) class convoluted(RawGreenlet): def __getattribute__(self, name): if name == 'run': self.parent = another[0] # pylint:disable=attribute-defined-outside-init return RawGreenlet.__getattribute__(self, name) g = convoluted(lambda: None) with self.assertRaises(greenlet.error) as exc: g.switch() self.assertEqual(str(exc.exception), "cannot switch to a different thread (which happens to have exited)") del another[:] def test_unexpected_reparenting_thread_running(self): # Like ``test_unexpected_reparenting``, except the background thread is # actually still alive. another = [] switched_to_greenlet = threading.Event() keep_main_alive = threading.Event() def worker(): g = RawGreenlet(lambda: None) another.append(g) g.switch() switched_to_greenlet.set() keep_main_alive.wait(10) class convoluted(RawGreenlet): def __getattribute__(self, name): if name == 'run': self.parent = another[0] # pylint:disable=attribute-defined-outside-init return RawGreenlet.__getattribute__(self, name) t = threading.Thread(target=worker) t.start() switched_to_greenlet.wait(10) try: g = convoluted(lambda: None) with self.assertRaises(greenlet.error) as exc: g.switch() self.assertIn("Cannot switch to a different thread", str(exc.exception)) self.assertIn("Expected", str(exc.exception)) self.assertIn("Current", str(exc.exception)) finally: keep_main_alive.set() t.join(10) # XXX: Should handle this automatically. del another[:] def test_cannot_delete_parent(self): worker = RawGreenlet(lambda: None) self.assertIs(worker.parent, greenlet.getcurrent()) with self.assertRaises(AttributeError) as exc: del worker.parent self.assertEqual(str(exc.exception), "can't delete attribute") def test_cannot_delete_parent_of_main(self): with self.assertRaises(AttributeError) as exc: del greenlet.getcurrent().parent self.assertEqual(str(exc.exception), "can't delete attribute") def test_main_greenlet_parent_is_none(self): # assuming we're in a main greenlet here. self.assertIsNone(greenlet.getcurrent().parent) def test_set_parent_wrong_types(self): def bg(): # Go back to main. greenlet.getcurrent().parent.switch() def check(glet): for p in None, 1, self, "42": with self.assertRaises(TypeError) as exc: glet.parent = p self.assertEqual( str(exc.exception), "GreenletChecker: Expected any type of greenlet, not " + type(p).__name__) # First, not running g = RawGreenlet(bg) self.assertFalse(g) check(g) # Then when running. g.switch() self.assertTrue(g) check(g) # Let it finish g.switch() def test_trivial_cycle(self): glet = RawGreenlet(lambda: None) with self.assertRaises(ValueError) as exc: glet.parent = glet self.assertEqual(str(exc.exception), "cyclic parent chain") def test_trivial_cycle_main(self): # This used to produce a ValueError, but we catch it earlier than that now. with self.assertRaises(AttributeError) as exc: greenlet.getcurrent().parent = greenlet.getcurrent() self.assertEqual(str(exc.exception), "cannot set the parent of a main greenlet") def test_deeper_cycle(self): g1 = RawGreenlet(lambda: None) g2 = RawGreenlet(lambda: None) g3 = RawGreenlet(lambda: None) g1.parent = g2 g2.parent = g3 with self.assertRaises(ValueError) as exc: g3.parent = g1 self.assertEqual(str(exc.exception), "cyclic parent chain") class TestRepr(TestCase): def assertEndsWith(self, got, suffix): self.assertTrue(got.endswith(suffix), (got, suffix)) def test_main_while_running(self): r = repr(greenlet.getcurrent()) self.assertEndsWith(r, " current active started main>") def test_main_in_background(self): main = greenlet.getcurrent() def run(): return repr(main) g = RawGreenlet(run) r = g.switch() self.assertEndsWith(r, ' suspended active started main>') def test_initial(self): r = repr(RawGreenlet()) self.assertEndsWith(r, ' pending>') def test_main_from_other_thread(self): main = greenlet.getcurrent() class T(threading.Thread): original_main = thread_main = None main_glet = None def run(self): self.original_main = repr(main) self.main_glet = greenlet.getcurrent() self.thread_main = repr(self.main_glet) t = T() t.start() t.join(10) self.assertEndsWith(t.original_main, ' suspended active started main>') self.assertEndsWith(t.thread_main, ' current active started main>') # give the machinery time to notice the death of the thread, # and clean it up. Note that we don't use # ``expect_greenlet_leak`` or wait_for_pending_cleanups, # because at this point we know we have an extra greenlet # still reachable. for _ in range(3): time.sleep(0.001) # In the past, main greenlets, even from dead threads, never # really appear dead. We have fixed that, and we also report # that the thread is dead in the repr. (Do this multiple times # to make sure that we don't self-modify and forget our state # in the C++ code). for _ in range(3): self.assertTrue(t.main_glet.dead) r = repr(t.main_glet) self.assertEndsWith(r, ' (thread exited) dead>') def test_dead(self): g = RawGreenlet(lambda: None) g.switch() self.assertEndsWith(repr(g), ' dead>') self.assertNotIn('suspended', repr(g)) self.assertNotIn('started', repr(g)) self.assertNotIn('active', repr(g)) def test_formatting_produces_native_str(self): # https://github.com/python-greenlet/greenlet/issues/218 # %s formatting on Python 2 was producing unicode, not str. g_dead = RawGreenlet(lambda: None) g_not_started = RawGreenlet(lambda: None) g_cur = greenlet.getcurrent() for g in g_dead, g_not_started, g_cur: self.assertIsInstance( '%s' % (g,), str ) self.assertIsInstance( '%r' % (g,), str, ) class TestMainGreenlet(TestCase): # Tests some implementation details, and relies on some # implementation details. def _check_current_is_main(self): # implementation detail assert 'main' in repr(greenlet.getcurrent()) t = type(greenlet.getcurrent()) assert 'main' not in repr(t) return t def test_main_greenlet_type_can_be_subclassed(self): main_type = self._check_current_is_main() subclass = type('subclass', (main_type,), {}) self.assertIsNotNone(subclass) def test_main_greenlet_is_greenlet(self): self._check_current_is_main() self.assertIsInstance(greenlet.getcurrent(), RawGreenlet) class TestBrokenGreenlets(TestCase): # Tests for things that used to, or still do, terminate the interpreter. # This often means doing unsavory things. def test_failed_to_initialstub(self): def func(): raise AssertionError("Never get here") g = greenlet._greenlet.UnswitchableGreenlet(func) g.force_switch_error = True with self.assertRaisesRegex(SystemError, "Failed to switch stacks into a greenlet for the first time."): g.switch() def test_failed_to_switch_into_running(self): runs = [] def func(): runs.append(1) greenlet.getcurrent().parent.switch() runs.append(2) greenlet.getcurrent().parent.switch() runs.append(3) # pragma: no cover g = greenlet._greenlet.UnswitchableGreenlet(func) g.switch() self.assertEqual(runs, [1]) g.switch() self.assertEqual(runs, [1, 2]) g.force_switch_error = True with self.assertRaisesRegex(SystemError, "Failed to switch stacks into a running greenlet."): g.switch() # If we stopped here, we would fail the leakcheck, because we've left # the ``inner_bootstrap()`` C frame and its descendents hanging around, # which have a bunch of Python references. They'll never get cleaned up # if we don't let the greenlet finish. g.force_switch_error = False g.switch() self.assertEqual(runs, [1, 2, 3]) def test_failed_to_slp_switch_into_running(self): ex = self.assertScriptRaises('fail_slp_switch.py') self.assertIn('fail_slp_switch is running', ex.output) self.assertIn(ex.returncode, self.get_expected_returncodes_for_aborted_process()) def test_reentrant_switch_two_greenlets(self): # Before we started capturing the arguments in g_switch_finish, this could crash. output = self.run_script('fail_switch_two_greenlets.py') self.assertIn('In g1_run', output) self.assertIn('TRACE', output) self.assertIn('LEAVE TRACE', output) self.assertIn('Falling off end of main', output) self.assertIn('Falling off end of g1_run', output) self.assertIn('Falling off end of g2', output) def test_reentrant_switch_three_greenlets(self): # On debug builds of greenlet, this used to crash with an assertion error; # on non-debug versions, it ran fine (which it should not do!). # Now it always crashes correctly with a TypeError ex = self.assertScriptRaises('fail_switch_three_greenlets.py', exitcodes=(1,)) self.assertIn('TypeError', ex.output) self.assertIn('positional arguments', ex.output) def test_reentrant_switch_three_greenlets2(self): # This actually passed on debug and non-debug builds. It # should probably have been triggering some debug assertions # but it didn't. # # I think the fixes for the above test also kicked in here. output = self.run_script('fail_switch_three_greenlets2.py') self.assertIn( "RESULTS: [('trace', 'switch'), " "('trace', 'switch'), ('g2 arg', 'g2 from tracefunc'), " "('trace', 'switch'), ('main g1', 'from g2_run'), ('trace', 'switch'), " "('g1 arg', 'g1 from main'), ('trace', 'switch'), ('main g2', 'from g1_run'), " "('trace', 'switch'), ('g1 from parent', 'g1 from main 2'), ('trace', 'switch'), " "('main g1.2', 'g1 done'), ('trace', 'switch'), ('g2 from parent', ()), " "('trace', 'switch'), ('main g2.2', 'g2 done')]", output ) def test_reentrant_switch_GreenletAlreadyStartedInPython(self): output = self.run_script('fail_initialstub_already_started.py') self.assertIn( "RESULTS: ['Begin C', 'Switch to b from B.__getattribute__ in C', " "('Begin B', ()), '_B_run switching to main', ('main from c', 'From B'), " "'B.__getattribute__ back from main in C', ('Begin A', (None,)), " "('A dead?', True, 'B dead?', True, 'C dead?', False), " "'C done', ('main from c.2', None)]", output ) def test_reentrant_switch_run_callable_has_del(self): output = self.run_script('fail_clearing_run_switches.py') self.assertIn( "RESULTS [" "('G.__getattribute__', 'run'), ('RunCallable', '__del__'), " "('main: g.switch()', 'from RunCallable'), ('run_func', 'enter')" "]", output ) if __name__ == '__main__': unittest.main()