"""
Some classes useful for multiprocessing, at least I think so
"""
from Queue import Empty
from multiprocessing import Queue,Process, TimeoutError
from signal import signal, SIGTERM
import time
class NotImpemented(Exception):pass
class StopMessage(Exception):pass
class StoppableProcess(Process):
def __init__(self):
self._stopped=False
self._controlQueue = Queue()
Process.__init__(self)
def join(self,timeout=None):
self._stop()
print "Shutting down",self.pid,self.__class__.__name__
Process.join(self,timeout=timeout)
def _stop(self):
#print "putting stop message in the queue",self.__class__.__name__
self._controlQueue.put_nowait(StopMessage())
#print "stop message in the queue...",self.__class__.__name__
def _processControlQueue(self):
"""
needs to be called once in a while to recieve important
message from external processes
"""
while not self._controlQueue.empty():
"""looking for StopMessage, hand everything
else off to _handleControler"""
try:
msg = self._controlQueue.get_nowait()
if isinstance(msg,StopMessage):
#print "stop message detected...",self.__class__.__name__
self._stopped = True
else:
self._handleControlMessage(msg)
except Empty:
break
def _handleControlMessage(self,message):
"""
handle in sub class if you are looking for something
interesting to respond to
"""
pass
def run(self):
while not self._stopped:
time.sleep(1)
class WorkerProcess(StoppableProcess):
"""
a process that is meant to run until it's stopped externally.
all communication is done via queues you give to it at
initialization time.
halt this process via stop method to insure properly
cleaned up(hopeully)
"""
def __init__(self,workQueue=None,resultQueue=None,interval=.25):
self._workQueue = workQueue or Queue()
self._resultQueue = resultQueue or Queue()
self._exceptionQueue = Queue()
self._interval = interval
self._stopped = False
StoppableProcess.__init__(self)
@property
def results(self):
return self._resultQueue
def run(self):
"""
main loop,will monitor workQueue and post to resultQueue
until a StopMessage is recieved on the controlQueue via
the stop method
"""
while not self._stopped:
#print "my pid:",self.pid
while not self._workQueue.empty():
try:
#print "getting work..."
next,result=None,None
next = self._workQueue.get_nowait()
if not next:
break
result = self._processItem(next)
if not result:
break
self._resultQueue.put_nowait(result)
time.sleep(self._interval)
except Empty, TimeoutError:
time.sleep(self._interval)
self._processControlQueue()
break
except Exception,ex:
print "\t\t\tgot an exception...\n",\
"\n\t\t\t".join((str(ex),str(next or "No Work"),
str(result or "No Result"))),"\n\n\n"
self._exceptionQueue.put_nowait(dict(input=next,
exception=ex))
time.sleep(self._interval)
else:
#print "sleeping..."
time.sleep(self._interval)
self._processControlQueue()
self._controlQueue.cancel_join_thread()
self._exceptionQueue.cancel_join_thread()
self._workQueue.cancel_join_thread()
self._resultQueue.cancel_join_thread()
#print "all done...",self.pid,self.__class__.__name__
def hasPending(self):
return not self._workQueue.empty()
def hasResults(self):
return not self._resultQueue.empty()
def put(self,data,block=True,timeout=None):
"""
put an item in the work queue
"""
self._workQueue.put(data,block=block,timeout=tiemout)
def put_nowait(self,data):
self._workQueue.put_nowait(data)
def get(self,block=True,timeout=None):
"""
get result from resultQueue
"""
return self._resultQueue.get(block=block,timeout=timeout)
def get_nowait(self):
return self._resultQueue.get_nowait()
def _processItem(self,data):
"""
we're abstract so throw an error to force subclass implementation
"""
raise NotImplemented,"""this method shouldn't be
called directly,and implemented in a subclass"""
def empty(self):
if self is not None:
return self._resultQueue.empty()
else:
print "self is None"
return True
class PoolWorker(WorkerProcess):
def __init__(self,
workerClass,
workQueue=None,
resultQueue=None,
poolSize=8):
WorkerProcess.__init__(self,
workQueue,
resultQueue)
self._poolSize = poolSize
self._w = None
self._workerClass = workerClass
def run(self):
#print "initializing workers..."
self._w = []
for i in xrange(self._poolSize):
assert self._workQueue,"Null Work Queue"
assert self._resultQueue,"Null result queue"
w = self._workerClass(self._workQueue,
self._resultQueue)
w.start()
self._w.append(w)
while not self._stopped:
self._interval = self._interval or .25
time.sleep(self._interval*len(self._w))
self._processControlQueue()
#print "will terminate..."
for w in self._w:
w.join(timeout=1)
if w.is_alive():
w.terminate()
self._controlQueue.cancel_join_thread()
self._exceptionQueue.cancel_join_thread()
self._workQueue.cancel_join_thread()
self._resultQueue.cancel_join_thread()