Multiprocessing

"""
Some classes useful for multiprocessing, at least I think so
"""
from Queue import Empty
from multiprocessing import Queue,Process, TimeoutError
from signal import signal, SIGTERM
import time
 
class NotImpemented(Exception):pass
 
class StopMessage(Exception):pass
 
class StoppableProcess(Process):
    def __init__(self):
        self._stopped=False
        self._controlQueue = Queue()
        Process.__init__(self)
 
    def join(self,timeout=None):
	self._stop()
	print "Shutting down",self.pid,self.__class__.__name__
	Process.join(self,timeout=timeout)
 
    def _stop(self):
        #print "putting stop message in the queue",self.__class__.__name__
        self._controlQueue.put_nowait(StopMessage())
        #print "stop message in the queue...",self.__class__.__name__
 
    def _processControlQueue(self):
        """
        needs to be called once in a while to recieve important
        message from external processes
        """
 
        while not self._controlQueue.empty():
            """looking for StopMessage, hand everything
            else off to _handleControler"""
            try:
                msg = self._controlQueue.get_nowait()
                if isinstance(msg,StopMessage):
                    #print "stop message detected...",self.__class__.__name__
                    self._stopped = True
                else:
                    self._handleControlMessage(msg)
            except Empty:
                break
 
    def _handleControlMessage(self,message):
        """
        handle in sub class if you are looking for something
        interesting to respond to
        """
        pass
 
 
    def run(self):
        while not self._stopped:
            time.sleep(1)
 
 
class WorkerProcess(StoppableProcess):
    """
    a process that is meant to run until it's stopped externally.
    all communication is done via queues you give to it at
    initialization time.
 
    halt this process via stop method to insure properly
    cleaned up(hopeully)
    """
    def __init__(self,workQueue=None,resultQueue=None,interval=.25):
        self._workQueue = workQueue or Queue()
        self._resultQueue = resultQueue or Queue()
        self._exceptionQueue = Queue()
        self._interval = interval
        self._stopped = False
        StoppableProcess.__init__(self)
 
 
    @property
    def results(self):
        return self._resultQueue
 
 
    def run(self):
        """
        main loop,will monitor workQueue and post to resultQueue
        until a StopMessage is recieved on the controlQueue via
        the stop method
        """
        while not self._stopped:
            #print "my pid:",self.pid
            while not self._workQueue.empty():
                try:
                    #print "getting work..."
                    next,result=None,None
                    next = self._workQueue.get_nowait()
 
                    if not next:
                        break
 
                    result = self._processItem(next)
 
                    if not result:
                        break
 
                    self._resultQueue.put_nowait(result)
 
                    time.sleep(self._interval)
 
                except Empty, TimeoutError:
                    time.sleep(self._interval)
                    self._processControlQueue()
                    break
 
                except Exception,ex:
                    print "\t\t\tgot an exception...\n",\
                    "\n\t\t\t".join((str(ex),str(next or "No Work"),
                               str(result or "No Result"))),"\n\n\n"
 
                    self._exceptionQueue.put_nowait(dict(input=next,
                                                         exception=ex))
                    time.sleep(self._interval)
 
            else:
                #print "sleeping..."
                time.sleep(self._interval)
                self._processControlQueue()
 
        self._controlQueue.cancel_join_thread()
        self._exceptionQueue.cancel_join_thread()
        self._workQueue.cancel_join_thread()
        self._resultQueue.cancel_join_thread()
        #print "all done...",self.pid,self.__class__.__name__
 
 
    def hasPending(self):
        return not self._workQueue.empty()
 
    def hasResults(self):
        return not self._resultQueue.empty()
 
    def put(self,data,block=True,timeout=None):
        """
        put an item in the work queue
        """
        self._workQueue.put(data,block=block,timeout=tiemout)
 
    def put_nowait(self,data):
        self._workQueue.put_nowait(data)
 
    def get(self,block=True,timeout=None):
        """
        get result from resultQueue
        """
        return self._resultQueue.get(block=block,timeout=timeout)
 
    def get_nowait(self):
        return self._resultQueue.get_nowait()
 
    def _processItem(self,data):
        """
        we're abstract so throw an error to force subclass implementation
        """
        raise NotImplemented,"""this method shouldn't be
        called directly,and implemented in a subclass"""
 
    def empty(self):
        if self is not None:
            return self._resultQueue.empty()
        else:
            print "self is None"
            return True
 
class PoolWorker(WorkerProcess):
    def __init__(self,
                 workerClass,
                 workQueue=None,
                 resultQueue=None,
                 poolSize=8):
 
        WorkerProcess.__init__(self,
                               workQueue,
                               resultQueue)
        self._poolSize = poolSize
        self._w = None
        self._workerClass = workerClass
 
 
 
 
    def run(self):
        #print "initializing workers..."
        self._w = []
 
 
        for i in xrange(self._poolSize):
            assert self._workQueue,"Null Work Queue"
            assert self._resultQueue,"Null result queue"
            w = self._workerClass(self._workQueue,
                                  self._resultQueue)
            w.start()
            self._w.append(w)
 
 
        while not self._stopped:
            self._interval = self._interval or .25
            time.sleep(self._interval*len(self._w))
            self._processControlQueue()
 
 
        #print "will terminate..."
        for w in self._w:
            w.join(timeout=1)
            if w.is_alive():
                w.terminate()
 
 
        self._controlQueue.cancel_join_thread()
        self._exceptionQueue.cancel_join_thread()
        self._workQueue.cancel_join_thread()
        self._resultQueue.cancel_join_thread()