Changeset 743

Show
Ignore:
Timestamp:
07/08/07 12:40:19 (2 years ago)
Author:
robin
Message:

tidied up the tests

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • asycamore/trunk/asycamore/threadpool.py

    r736 r743  
    1616log = logging.getLogger(__name__) 
    1717 
     18__all__ = ( 
     19        'ThreadTeam ThreadPool Thread PoolThread' 
     20        ).split() 
     21 
    1822# Be useful in the absence of asycamore 
    1923try: 
     
    5054 
    5155class ThreadTeam(object): 
     56 
     57    """A team of worker threads. 
     58 
     59    Instance in your main thread of control and call queue_work to arrange for 
     60    a callable to be invoked in one of the threads managed by the thread team 
     61    instance. 
     62 
     63    The public methods of this class should *not* be used concurrently from 
     64    multiple threads. You may safely transfer owner ship of a threadteam 
     65    instance from one thread to another, provided that none of the threads 
     66    that may own the team are themselves team members. 
     67 
     68 
     69    The keys on the internal dictionaries are *always* strings. Even if the 
     70    keys were some custom instance the chances of corruption are ZERO and the 
     71    chances of an erroneously failed lookup are very close to zero. Even if we 
     72    get an erroneously failed lookup the worker will simply get the value the 
     73    next time around its 'get' loop. 
     74 
     75    We *NEVER* do a read/modify/write on the values without taking out the 
     76    associated condition variable. 
     77    We *NEVER* remove keys which correspond to live threads. 
     78 
     79    http://mail.python.org/pipermail/python-list/2007-June/443324.html 
     80    http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm 
     81    http://mail.python.org/pipermail/python-dev/2006-November/069981.html 
     82 
     83 
     84    """ 
    5285 
    5386    def __init__(self, terminal_value=object(), cancel_io_wait=None): 
     
    379412 
    380413        return threadname 
    381  
    382414 
    383415 
     
    684716            del running 
    685717            self.end() 
    686  
    687  
    688 def test_async_terminate(): 
    689     raise NotImplementedError 
    690  
    691  
    692 def test_simple_daemon_threads(): 
    693     log.setLevel(logging.INFO) 
    694     logging.basicConfig(format='%(message)s') 
    695     import random 
    696     pool = ThreadPool() 
    697     Qin = Queue.Queue() 
    698     Qout = Queue.Queue() 
    699  
    700     n_workers = 5 
    701     n_jobs = 15 
    702  
    703     def sleep_job(jobid): 
    704         tstart = time.time() 
    705         s = random.uniform(0.0, 0.5) 
    706         threadname = threading.currentThread()._threadname 
    707         log.info(str(jobid)+':'+str(threadname)+':sleep(%f)', s) 
    708         time.sleep(s) 
    709         return s, tstart, time.time(), jobid 
    710  
    711     def start_thread(): 
    712         thr = PoolThread() 
    713         thr.start(lambda:Qin.get()(), notify_end=False, as_daemon=True) 
    714         return thr 
    715  
    716     trun_start = time.time() 
    717     for i in xrange(n_jobs): 
    718         Qin.put(lambda i=i: Qout.put(sleep_job(i))) 
    719  
    720     pool.set_thread_count(n_workers, start_thread) 
    721  
    722     n_complete = 0 
    723     while n_complete < n_jobs: 
    724         s, tstart, tend, i = Qout.get() 
    725         tcomplete = time.time() 
    726         log.info(str(i)+':s=%f,started=%f,finished=%f,completed=%f', 
    727                 s, tstart, tend, tcomplete 
    728                 ) 
    729         log.info(str(i)+'tcompleted - tend = %f', tcomplete - tend) 
    730         n_complete += 1 
    731         log.info(str(n_complete)+':'+str(bool(n_complete < (n_jobs-1)))) 
    732  
    733     log.info('shutdown:1') 
    734     _, workers = pool.set_thread_count(0, None) 
    735     # `terminate` will not work here, all workers are blocked waiting on the 
    736     # queue. 
    737     log.info('DURATION: %f', time.time() - trun_start) 
    738  
    739  
    740 def test_simple_daemon_threads_get_put(): 
    741     log.setLevel(logging.DEBUG) 
    742     logging.basicConfig(format='%(message)s') 
    743     import random 
    744     pool = ThreadPool() 
    745     Qin = Queue.Queue() 
    746     Qout = Queue.Queue() 
    747     from functools import partial 
    748  
    749     n_workers = 5 
    750     n_jobs = 15 
    751  
    752     def sleep_job(jobid): 
    753         tstart = time.time() 
    754         s = random.uniform(0.0, 0.5) 
    755         threadname = threading.currentThread()._threadname 
    756         log.info(str(jobid)+':'+str(threadname)+':sleep(%f)', s) 
    757         time.sleep(s) 
    758         return s, tstart, time.time(), jobid 
    759  
    760  
    761     def start_thread(): 
    762         thr = PoolThread() 
    763         thr.run = thr.run_get_put 
    764         thr.start(lambda:Qin.get(), notify_end=False, as_daemon=True) 
    765         return thr 
    766  
    767     trun_start = time.time() 
    768     for i in xrange(n_jobs): 
    769         Qin.put((partial(sleep_job, i), Qout.put)) 
    770  
    771     pool.set_thread_count(n_workers, start_thread) 
    772  
    773     n_complete = 0 
    774     while n_complete < n_jobs: 
    775         s, tstart, tend, i = Qout.get() 
    776         tcomplete = time.time() 
    777         log.info(str(i)+':s=%f,started=%f,finished=%f,completed=%f', 
    778                 s, tstart, tend, tcomplete 
    779                 ) 
    780         log.info(str(i)+'tcompleted - tend = %f', tcomplete - tend) 
    781         n_complete += 1 
    782         log.info(str(n_complete)+':'+str(bool(n_complete < (n_jobs-1)))) 
    783  
    784     log.info('shutdown:1') 
    785     _, workers = pool.set_thread_count(0, None) 
    786     # `terminate` will not work here, all workers are blocked waiting on the 
    787     # queue. 
    788     log.info('DURATION: %f', time.time() - trun_start) 
    789  
    790  
    791 def test_terminal_value(): 
    792     log.setLevel(logging.INFO) 
    793     logging.basicConfig(format='%(message)s') 
    794     import random 
    795     pool = ThreadPool() 
    796     Qout = Queue.Queue() 
    797  
    798     n_workers = 5 
    799     n_jobs = 15 
    800  
    801     def sleep_job(jobid): 
    802         tstart = time.time() 
    803         s = random.uniform(0.0, 0.5) 
    804         threadname = threading.currentThread()._threadname 
    805         log.info(str(jobid)+':'+str(threadname)+':sleep(%f)', s) 
    806         time.sleep(s) 
    807         return s, tstart, time.time(), jobid 
    808  
    809     terminal=object() 
    810     thread_inqueues = {} 
    811     def start_thread(): 
    812         thr = PoolThread(terminal_value=terminal) 
    813         Qin = Queue.Queue() 
    814         def next(): 
    815             v = Qin.get() 
    816             if v is terminal: 
    817                 return v 
    818             return v() 
    819  
    820         thr.start(next, notify_end=False, as_daemon=False) 
    821         thread_inqueues[thr.getName()]=Qin 
    822         return thr 
    823  
    824     def thread_end(thread, threadname, threadid): 
    825         assert thread.getName() == threadname 
    826         thread_inqueues[threadname].put(terminal) 
    827         thread.join() 
    828  
    829     pool.set_thread_count(n_workers, start_thread) 
    830  
    831     trun_start = time.time() 
    832     threadnames = sorted(thread_inqueues.keys(), reverse=True) 
    833     for i in xrange(n_jobs): 
    834         if not threadnames: 
    835             threadnames = sorted(thread_inqueues.keys(), reverse=True) 
    836         Qin = thread_inqueues[threadnames.pop()] 
    837         Qin.put(lambda i=i: Qout.put(sleep_job(i))) 
    838  
    839  
    840     n_complete = 0 
    841     while n_complete < n_jobs: 
    842         s, tstart, tend, i = Qout.get() 
    843         tcomplete = time.time() 
    844         log.info(str(i)+':s=%f,started=%f,finished=%f,completed=%f', 
    845                 s, tstart, tend, tcomplete 
    846                 ) 
    847         log.info(str(i)+'tcompleted - tend = %f', tcomplete - tend) 
    848         n_complete += 1 
    849         log.info(str(n_complete)+':'+str(bool(n_complete < (n_jobs-1)))) 
    850  
    851     log.info('shutdown:1') 
    852     _, reaped = pool.set_thread_count(0, None, thread_end=thread_end) 
    853     log.info('DURATION: %f', time.time() - trun_start) 
    854  
    855  
    856 def test_threadteam_basicwork_loop(): 
    857     terminal = object() 
    858     n_workers = 5 
    859     n_jobs = 17 
    860  
    861     team = ThreadTeam(terminal_value=terminal) 
    862     team.set_team_size(n_workers) 
    863  
    864     results = [] 
    865     for i in xrange(n_jobs): 
    866         team.delegate(lambda i=i:i, lambda v: results.append(v)) 
    867     ncomplete = len(results) 
    868     nissued = n_jobs - len(team.delegate_queue) 
    869     print '*****\nMAIN LOOP ncomplete %d ...\n' % ncomplete 
    870     while ncomplete < n_jobs: 
    871         nc, ni = team.dispatch() 
    872         ncomplete += nc 
    873         nissued += ni 
    874         if not nc: 
    875             time.sleep(0.1) 
    876         else: 
    877             print 'Completed: %d, ncomplete %d' % (nc, ncomplete) 
    878  
    879     assert len(results) == ncomplete, "len(results):%d != ncomplete:%d" % ( 
    880             len(results), ncomplete 
    881             ) 
    882     assert not (set(results) - set([i for i in xrange(n_jobs)])) 
    883  
    884     team.set_team_size(0) 
    885  
    886     print 'ok' 
    887  
    888  
    889 def test_threadteam_hot_resize(): 
    890     terminal = object() 
    891     n_workers = 7 
    892     n_workers_resized = 4 
    893     n_jobs = 100 
    894     resize_after = 50 
    895  
    896     team = ThreadTeam(terminal_value=terminal) 
    897     team.set_team_size(n_workers) 
    898  
    899     results = [] 
    900     canceled_markers = [object() for i in xrange(n_jobs)] 
    901     for i in xrange(n_jobs): 
    902         team.delegate(lambda i=i:i, lambda v: results.append(v), 
    903                 canceled_marker=canceled_markers[i] 
    904                 ) 
    905  
    906     ncomplete = len(results) 
    907     nissued = n_jobs - len(team.delegate_queue) 
    908     has_resized = False 
    909     print '*****\nMAIN LOOP ncomplete %d ...\n' % ncomplete 
    910  
    911     while ncomplete < n_jobs: 
    912         if has_resized or ncomplete < resize_after: 
    913             completed, issued = team.dispatch() 
    914         else: 
    915             has_resized = True 
    916             print 'Resizing from %d to %d, %d of %d complete' % ( 
    917                     n_workers, n_workers_resized, ncomplete, n_jobs 
    918                     ) 
    919             completed, issued, (created, reaped) = team.dispatch( 
    920                     set_team_size=n_workers_resized) 
    921             assert len(reaped) == (n_workers - n_workers_resized), ( 
    922                     ('len(reaped):%d != ' 
    923                         '(n_workers - n_workers_resized):%d') % ( 
    924                             len(reaped), n_workers - n_workers_resized) 
    925                         ) 
    926  
    927         ncomplete += completed 
    928         if not completed: 
    929             time.sleep(0.1) 
    930         else: 
    931             print 'Completed: %d, ncomplete %d' % (completed, ncomplete) 
    932  
    933     assert len(results) == ncomplete, "len(results):%d != ncomplete:%d" % ( 
    934             len(results), ncomplete 
    935             ) 
    936     assert not (set(results) - set([i for i in xrange(n_jobs)]) - 
    937             set(canceled_markers) 
    938             ) 
    939  
    940     team.set_team_size(0) 
    941  
    942  
    943 def ignore(): 
    944     """A team of worker threads. 
    945  
    946     Instance in your main thread of control and call queue_work to arrange for 
    947     a callable to be invoked in one of the threads managed by the thread team 
    948     instance. 
    949  
    950     The public methods of this class should *not* be used concurrently from 
    951     multiple threads. You may safely transfer owner ship of a threadteam 
    952     instance from one thread to another, provided that none of the threads 
    953     that may own the team are themselves team members. 
    954  
    955     The issued_delegates, results and members dictionaries 
    956     ............................................................... 
    957  
    958     Each thread in the team gets an entry in `members`, `issued_delegates` and 
    959     `results`. These entries are created as the thread is being created and 
    960     *before* it is running. These entries are not removed until the associated 
    961     team thread has terminated.  `members` contains a reference to the thread 
    962     instance and a condition variable that is used to gate access to the 
    963     threads private - 1 deep - work queue. Items are 'put' to the first 
    964     available worker in `issue_delegates` by acquiring the associated condition 
    965     variable in `members` and then setting the application callable as the 
    966     value for that workers key in `issued_delegates`. The worker 'gets' the 
    967     work by first acquiring the condition variable in members and then taking 
    968     the callable from issued_delegates. Before releases the condition variable 
    969     - having obtained the callable - it first sets its entry in 
    970       `issued_delegates` to None. 
    971  
    972     `results` is only accessed from the master thread. The master 
    973     thread is the thread that owns the threadteam instance. 
    974  
    975     The prune of dead keys from members and issued_delegates is *not* guarded. 
    976     This is deliberate. The implementation *NEVER* sets a value in members or 
    977     issued_delegates without taking out the condition variable. The worst that 
    978     could happen when set_team_size causes the dictionaries to resize is that a 
    979     worker may get None from its issued_delegates entry when there is in fact a 
    980     callable ready to fire.  We do not resize often and if we did trigger this 
    981     condition it is completely benign, the worker loops *until* it gets a 
    982     value, and as soon as the dictionary settles down things proceed as normal. 
    983  
    984     Our keys are *always* strings we are thread safe. Even if the keys were 
    985     some custom instance the chances of corruption are ZERO and the chances of 
    986     an erroneously failed lookup are very close to zero. Even if we get an 
    987     erroneously failed lookup the worker will simply get the value the next 
    988     time around its 'get' loop. 
    989  
    990     We *NEVER* do a read/modify/write on the values. 
    991     We *NEVER* remove keys which correspond to live threads. 
    992  
    993     http://mail.python.org/pipermail/python-list/2007-June/443324.html 
    994     http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm 
    995     http://mail.python.org/pipermail/python-dev/2006-November/069981.html 
    996  
    997     """ 
    998  
    999 if __name__=='__main__': 
    1000     logging.basicConfig(format='%(message)s :: %(asctime)s') 
    1001     log.setLevel(logging.DEBUG) 
    1002     tests=[ 
    1003         test_simple_daemon_threads, 
    1004         test_simple_daemon_threads_get_put, 
    1005         test_terminal_value, 
    1006         test_threadteam_basicwork_loop, 
    1007         test_threadteam_hot_resize 
    1008         ] 
    1009     passed=[] 
    1010     try: 
    1011         for t in tests: 
    1012             t() 
    1013             passed.append(t.__name__) 
    1014     except: 
    1015         print exc_string() 
    1016         print 'PASSED:', ', '.join(passed) 
    1017         if len(passed) < len(tests): 
    1018             print 'FAILED:', tests[len(passed)].__name__ 
    1019         if len(passed) < len(tests) - 1: 
    1020             print 'NOT TESTED:', ', '.join([t.__name__ 
    1021                 for t in tests[len(passed)+1:]]) 
    1022         sys.exit(-1) 
    1023     else: 
    1024         print 'PASSED:', ', '.join(passed) 
    1025  
     718#eof