import py |
try: |
from py.magic import greenlet |
except (ImportError, RuntimeError), e: |
py.test.skip(str(e)) |
|
import sys, gc |
from py.test import raises |
try: |
import thread, threading |
except ImportError: |
thread = None |
|
def test_simple(): |
lst = [] |
def f(): |
lst.append(1) |
greenlet.getcurrent().parent.switch() |
lst.append(3) |
g = greenlet(f) |
lst.append(0) |
g.switch() |
lst.append(2) |
g.switch() |
lst.append(4) |
assert lst == range(5) |
|
def test_threads(): |
if not thread: |
py.test.skip("this is a test about thread") |
success = [] |
def f(): |
test_simple() |
success.append(True) |
ths = [threading.Thread(target=f) for i in range(10)] |
for th in ths: |
th.start() |
for th in ths: |
th.join() |
assert len(success) == len(ths) |
|
|
class SomeError(Exception): |
pass |
|
def fmain(seen): |
try: |
greenlet.getcurrent().parent.switch() |
except: |
seen.append(sys.exc_info()[0]) |
raise |
raise SomeError |
|
def test_exception(): |
seen = [] |
g1 = greenlet(fmain) |
g2 = greenlet(fmain) |
g1.switch(seen) |
g2.switch(seen) |
g2.parent = g1 |
assert seen == [] |
raises(SomeError, g2.switch) |
assert seen == [SomeError] |
g2.switch() |
assert seen == [SomeError] |
|
def send_exception(g, exc): |
|
|
def crasher(exc): |
raise exc |
g1 = greenlet(crasher, parent=g) |
g1.switch(exc) |
|
def test_send_exception(): |
seen = [] |
g1 = greenlet(fmain) |
g1.switch(seen) |
raises(KeyError, "send_exception(g1, KeyError)") |
assert seen == [KeyError] |
|
def test_dealloc(): |
seen = [] |
g1 = greenlet(fmain) |
g2 = greenlet(fmain) |
g1.switch(seen) |
g2.switch(seen) |
assert seen == [] |
del g1 |
gc.collect() |
assert seen == [greenlet.GreenletExit] |
del g2 |
gc.collect() |
assert seen == [greenlet.GreenletExit, greenlet.GreenletExit] |
|
def test_dealloc_other_thread(): |
if not thread: |
py.test.skip("this is a test about thread") |
seen = [] |
someref = [] |
lock = thread.allocate_lock() |
lock.acquire() |
lock2 = thread.allocate_lock() |
lock2.acquire() |
def f(): |
g1 = greenlet(fmain) |
g1.switch(seen) |
someref.append(g1) |
del g1 |
gc.collect() |
lock.release() |
lock2.acquire() |
greenlet() |
lock.release() |
lock2.acquire() |
t = threading.Thread(target=f) |
t.start() |
lock.acquire() |
assert seen == [] |
assert len(someref) == 1 |
del someref[:] |
gc.collect() |
|
assert seen == [] |
lock2.release() |
lock.acquire() |
assert seen == [greenlet.GreenletExit] |
lock2.release() |
t.join() |
|
def test_frame(): |
def f1(): |
f = sys._getframe(0) |
assert f.f_back is None |
greenlet.getcurrent().parent.switch(f) |
return "meaning of life" |
g = greenlet(f1) |
frame = g.switch() |
assert frame is g.gr_frame |
assert g |
next = g.switch() |
assert not g |
assert next == "meaning of life" |
assert g.gr_frame is None |
|
def test_thread_bug(): |
if not thread: |
py.test.skip("this is a test about thread") |
import time |
def runner(x): |
g = greenlet(lambda: time.sleep(x)) |
g.switch() |
t1 = threading.Thread(target=runner, args=(0.2,)) |
t2 = threading.Thread(target=runner, args=(0.3,)) |
t1.start() |
t2.start() |
t1.join() |
t2.join() |
|