""" |
Node code for Master. |
""" |
import py |
from py.__.test.rsession.outcome import ReprOutcome |
from py.__.test.rsession import repevent |
|
class MasterNode(object): |
def __init__(self, channel, reporter): |
self.channel = channel |
self.reporter = reporter |
self.pending = [] |
channel.setcallback(self._callback) |
|
def _callback(self, outcome): |
item = self.pending.pop() |
self.receive_result(outcome, item) |
|
def receive_result(self, outcomestring, item): |
repr_outcome = ReprOutcome(outcomestring) |
|
self.reporter(repevent.ReceivedItemOutcome( |
self.channel, item, repr_outcome)) |
|
def send(self, item): |
try: |
if item is StopIteration: |
self.channel.send(42) |
else: |
self.pending.insert(0, item) |
|
self.channel.send(item._get_collector_trail()) |
|
self.reporter(repevent.SendItem(self.channel, item)) |
except IOError: |
print "Sending error, channel IOError" |
print self.channel._getremoteerror() |
|
|
raise |
|
def itemgen(colitems, reporter, keyword, reporterror): |
def rep(x): |
reporterror(reporter, x) |
for x in colitems: |
for y in x._tryiter(reporterror=rep, keyword=keyword): |
yield y |
|
def dispatch_loop(masternodes, itemgenerator, shouldstop, |
waiter = lambda: py.std.time.sleep(0.1), |
max_tasks_per_node=None): |
if not max_tasks_per_node: |
max_tasks_per_node = py.test.config.getvalue("dist_taskspernode") |
all_tests = {} |
while 1: |
try: |
for node in masternodes: |
if len(node.pending) < max_tasks_per_node: |
item = itemgenerator.next() |
all_tests[item] = True |
if shouldstop(): |
for _node in masternodes: |
_node.send(StopIteration) |
return None |
node.send(item) |
except StopIteration: |
break |
waiter() |
return all_tests |
|
|