| 686 | | |
|---|
| 687 | | |
|---|
| 688 | | def test_async_terminate(): |
|---|
| 689 | | raise NotImplementedError |
|---|
| 690 | | |
|---|
| 691 | | |
|---|
| 692 | | def test_simple_daemon_threads(): |
|---|
| 693 | | log.setLevel(logging.INFO) |
|---|
| 694 | | logging.basicConfig(format='%(message)s') |
|---|
| 695 | | import random |
|---|
| 696 | | pool = ThreadPool() |
|---|
| 697 | | Qin = Queue.Queue() |
|---|
| 698 | | Qout = Queue.Queue() |
|---|
| 699 | | |
|---|
| 700 | | n_workers = 5 |
|---|
| 701 | | n_jobs = 15 |
|---|
| 702 | | |
|---|
| 703 | | def sleep_job(jobid): |
|---|
| 704 | | tstart = time.time() |
|---|
| 705 | | s = random.uniform(0.0, 0.5) |
|---|
| 706 | | threadname = threading.currentThread()._threadname |
|---|
| 707 | | log.info(str(jobid)+':'+str(threadname)+':sleep(%f)', s) |
|---|
| 708 | | time.sleep(s) |
|---|
| 709 | | return s, tstart, time.time(), jobid |
|---|
| 710 | | |
|---|
| 711 | | def start_thread(): |
|---|
| 712 | | thr = PoolThread() |
|---|
| 713 | | thr.start(lambda:Qin.get()(), notify_end=False, as_daemon=True) |
|---|
| 714 | | return thr |
|---|
| 715 | | |
|---|
| 716 | | trun_start = time.time() |
|---|
| 717 | | for i in xrange(n_jobs): |
|---|
| 718 | | Qin.put(lambda i=i: Qout.put(sleep_job(i))) |
|---|
| 719 | | |
|---|
| 720 | | pool.set_thread_count(n_workers, start_thread) |
|---|
| 721 | | |
|---|
| 722 | | n_complete = 0 |
|---|
| 723 | | while n_complete < n_jobs: |
|---|
| 724 | | s, tstart, tend, i = Qout.get() |
|---|
| 725 | | tcomplete = time.time() |
|---|
| 726 | | log.info(str(i)+':s=%f,started=%f,finished=%f,completed=%f', |
|---|
| 727 | | s, tstart, tend, tcomplete |
|---|
| 728 | | ) |
|---|
| 729 | | log.info(str(i)+'tcompleted - tend = %f', tcomplete - tend) |
|---|
| 730 | | n_complete += 1 |
|---|
| 731 | | log.info(str(n_complete)+':'+str(bool(n_complete < (n_jobs-1)))) |
|---|
| 732 | | |
|---|
| 733 | | log.info('shutdown:1') |
|---|
| 734 | | _, workers = pool.set_thread_count(0, None) |
|---|
| 735 | | # `terminate` will not work here, all workers are blocked waiting on the |
|---|
| 736 | | # queue. |
|---|
| 737 | | log.info('DURATION: %f', time.time() - trun_start) |
|---|
| 738 | | |
|---|
| 739 | | |
|---|
| 740 | | def test_simple_daemon_threads_get_put(): |
|---|
| 741 | | log.setLevel(logging.DEBUG) |
|---|
| 742 | | logging.basicConfig(format='%(message)s') |
|---|
| 743 | | import random |
|---|
| 744 | | pool = ThreadPool() |
|---|
| 745 | | Qin = Queue.Queue() |
|---|
| 746 | | Qout = Queue.Queue() |
|---|
| 747 | | from functools import partial |
|---|
| 748 | | |
|---|
| 749 | | n_workers = 5 |
|---|
| 750 | | n_jobs = 15 |
|---|
| 751 | | |
|---|
| 752 | | def sleep_job(jobid): |
|---|
| 753 | | tstart = time.time() |
|---|
| 754 | | s = random.uniform(0.0, 0.5) |
|---|
| 755 | | threadname = threading.currentThread()._threadname |
|---|
| 756 | | log.info(str(jobid)+':'+str(threadname)+':sleep(%f)', s) |
|---|
| 757 | | time.sleep(s) |
|---|
| 758 | | return s, tstart, time.time(), jobid |
|---|
| 759 | | |
|---|
| 760 | | |
|---|
| 761 | | def start_thread(): |
|---|
| 762 | | thr = PoolThread() |
|---|
| 763 | | thr.run = thr.run_get_put |
|---|
| 764 | | thr.start(lambda:Qin.get(), notify_end=False, as_daemon=True) |
|---|
| 765 | | return thr |
|---|
| 766 | | |
|---|
| 767 | | trun_start = time.time() |
|---|
| 768 | | for i in xrange(n_jobs): |
|---|
| 769 | | Qin.put((partial(sleep_job, i), Qout.put)) |
|---|
| 770 | | |
|---|
| 771 | | pool.set_thread_count(n_workers, start_thread) |
|---|
| 772 | | |
|---|
| 773 | | n_complete = 0 |
|---|
| 774 | | while n_complete < n_jobs: |
|---|
| 775 | | s, tstart, tend, i = Qout.get() |
|---|
| 776 | | tcomplete = time.time() |
|---|
| 777 | | log.info(str(i)+':s=%f,started=%f,finished=%f,completed=%f', |
|---|
| 778 | | s, tstart, tend, tcomplete |
|---|
| 779 | | ) |
|---|
| 780 | | log.info(str(i)+'tcompleted - tend = %f', tcomplete - tend) |
|---|
| 781 | | n_complete += 1 |
|---|
| 782 | | log.info(str(n_complete)+':'+str(bool(n_complete < (n_jobs-1)))) |
|---|
| 783 | | |
|---|
| 784 | | log.info('shutdown:1') |
|---|
| 785 | | _, workers = pool.set_thread_count(0, None) |
|---|
| 786 | | # `terminate` will not work here, all workers are blocked waiting on the |
|---|
| 787 | | # queue. |
|---|
| 788 | | log.info('DURATION: %f', time.time() - trun_start) |
|---|
| 789 | | |
|---|
| 790 | | |
|---|
| 791 | | def test_terminal_value(): |
|---|
| 792 | | log.setLevel(logging.INFO) |
|---|
| 793 | | logging.basicConfig(format='%(message)s') |
|---|
| 794 | | import random |
|---|
| 795 | | pool = ThreadPool() |
|---|
| 796 | | Qout = Queue.Queue() |
|---|
| 797 | | |
|---|
| 798 | | n_workers = 5 |
|---|
| 799 | | n_jobs = 15 |
|---|
| 800 | | |
|---|
| 801 | | def sleep_job(jobid): |
|---|
| 802 | | tstart = time.time() |
|---|
| 803 | | s = random.uniform(0.0, 0.5) |
|---|
| 804 | | threadname = threading.currentThread()._threadname |
|---|
| 805 | | log.info(str(jobid)+':'+str(threadname)+':sleep(%f)', s) |
|---|
| 806 | | time.sleep(s) |
|---|
| 807 | | return s, tstart, time.time(), jobid |
|---|
| 808 | | |
|---|
| 809 | | terminal=object() |
|---|
| 810 | | thread_inqueues = {} |
|---|
| 811 | | def start_thread(): |
|---|
| 812 | | thr = PoolThread(terminal_value=terminal) |
|---|
| 813 | | Qin = Queue.Queue() |
|---|
| 814 | | def next(): |
|---|
| 815 | | v = Qin.get() |
|---|
| 816 | | if v is terminal: |
|---|
| 817 | | return v |
|---|
| 818 | | return v() |
|---|
| 819 | | |
|---|
| 820 | | thr.start(next, notify_end=False, as_daemon=False) |
|---|
| 821 | | thread_inqueues[thr.getName()]=Qin |
|---|
| 822 | | return thr |
|---|
| 823 | | |
|---|
| 824 | | def thread_end(thread, threadname, threadid): |
|---|
| 825 | | assert thread.getName() == threadname |
|---|
| 826 | | thread_inqueues[threadname].put(terminal) |
|---|
| 827 | | thread.join() |
|---|
| 828 | | |
|---|
| 829 | | pool.set_thread_count(n_workers, start_thread) |
|---|
| 830 | | |
|---|
| 831 | | trun_start = time.time() |
|---|
| 832 | | threadnames = sorted(thread_inqueues.keys(), reverse=True) |
|---|
| 833 | | for i in xrange(n_jobs): |
|---|
| 834 | | if not threadnames: |
|---|
| 835 | | threadnames = sorted(thread_inqueues.keys(), reverse=True) |
|---|
| 836 | | Qin = thread_inqueues[threadnames.pop()] |
|---|
| 837 | | Qin.put(lambda i=i: Qout.put(sleep_job(i))) |
|---|
| 838 | | |
|---|
| 839 | | |
|---|
| 840 | | n_complete = 0 |
|---|
| 841 | | while n_complete < n_jobs: |
|---|
| 842 | | s, tstart, tend, i = Qout.get() |
|---|
| 843 | | tcomplete = time.time() |
|---|
| 844 | | log.info(str(i)+':s=%f,started=%f,finished=%f,completed=%f', |
|---|
| 845 | | s, tstart, tend, tcomplete |
|---|
| 846 | | ) |
|---|
| 847 | | log.info(str(i)+'tcompleted - tend = %f', tcomplete - tend) |
|---|
| 848 | | n_complete += 1 |
|---|
| 849 | | log.info(str(n_complete)+':'+str(bool(n_complete < (n_jobs-1)))) |
|---|
| 850 | | |
|---|
| 851 | | log.info('shutdown:1') |
|---|
| 852 | | _, reaped = pool.set_thread_count(0, None, thread_end=thread_end) |
|---|
| 853 | | log.info('DURATION: %f', time.time() - trun_start) |
|---|
| 854 | | |
|---|
| 855 | | |
|---|
| 856 | | def test_threadteam_basicwork_loop(): |
|---|
| 857 | | terminal = object() |
|---|
| 858 | | n_workers = 5 |
|---|
| 859 | | n_jobs = 17 |
|---|
| 860 | | |
|---|
| 861 | | team = ThreadTeam(terminal_value=terminal) |
|---|
| 862 | | team.set_team_size(n_workers) |
|---|
| 863 | | |
|---|
| 864 | | results = [] |
|---|
| 865 | | for i in xrange(n_jobs): |
|---|
| 866 | | team.delegate(lambda i=i:i, lambda v: results.append(v)) |
|---|
| 867 | | ncomplete = len(results) |
|---|
| 868 | | nissued = n_jobs - len(team.delegate_queue) |
|---|
| 869 | | print '*****\nMAIN LOOP ncomplete %d ...\n' % ncomplete |
|---|
| 870 | | while ncomplete < n_jobs: |
|---|
| 871 | | nc, ni = team.dispatch() |
|---|
| 872 | | ncomplete += nc |
|---|
| 873 | | nissued += ni |
|---|
| 874 | | if not nc: |
|---|
| 875 | | time.sleep(0.1) |
|---|
| 876 | | else: |
|---|
| 877 | | print 'Completed: %d, ncomplete %d' % (nc, ncomplete) |
|---|
| 878 | | |
|---|
| 879 | | assert len(results) == ncomplete, "len(results):%d != ncomplete:%d" % ( |
|---|
| 880 | | len(results), ncomplete |
|---|
| 881 | | ) |
|---|
| 882 | | assert not (set(results) - set([i for i in xrange(n_jobs)])) |
|---|
| 883 | | |
|---|
| 884 | | team.set_team_size(0) |
|---|
| 885 | | |
|---|
| 886 | | print 'ok' |
|---|
| 887 | | |
|---|
| 888 | | |
|---|
| 889 | | def test_threadteam_hot_resize(): |
|---|
| 890 | | terminal = object() |
|---|
| 891 | | n_workers = 7 |
|---|
| 892 | | n_workers_resized = 4 |
|---|
| 893 | | n_jobs = 100 |
|---|
| 894 | | resize_after = 50 |
|---|
| 895 | | |
|---|
| 896 | | team = ThreadTeam(terminal_value=terminal) |
|---|
| 897 | | team.set_team_size(n_workers) |
|---|
| 898 | | |
|---|
| 899 | | results = [] |
|---|
| 900 | | canceled_markers = [object() for i in xrange(n_jobs)] |
|---|
| 901 | | for i in xrange(n_jobs): |
|---|
| 902 | | team.delegate(lambda i=i:i, lambda v: results.append(v), |
|---|
| 903 | | canceled_marker=canceled_markers[i] |
|---|
| 904 | | ) |
|---|
| 905 | | |
|---|
| 906 | | ncomplete = len(results) |
|---|
| 907 | | nissued = n_jobs - len(team.delegate_queue) |
|---|
| 908 | | has_resized = False |
|---|
| 909 | | print '*****\nMAIN LOOP ncomplete %d ...\n' % ncomplete |
|---|
| 910 | | |
|---|
| 911 | | while ncomplete < n_jobs: |
|---|
| 912 | | if has_resized or ncomplete < resize_after: |
|---|
| 913 | | completed, issued = team.dispatch() |
|---|
| 914 | | else: |
|---|
| 915 | | has_resized = True |
|---|
| 916 | | print 'Resizing from %d to %d, %d of %d complete' % ( |
|---|
| 917 | | n_workers, n_workers_resized, ncomplete, n_jobs |
|---|
| 918 | | ) |
|---|
| 919 | | completed, issued, (created, reaped) = team.dispatch( |
|---|
| 920 | | set_team_size=n_workers_resized) |
|---|
| 921 | | assert len(reaped) == (n_workers - n_workers_resized), ( |
|---|
| 922 | | ('len(reaped):%d != ' |
|---|
| 923 | | '(n_workers - n_workers_resized):%d') % ( |
|---|
| 924 | | len(reaped), n_workers - n_workers_resized) |
|---|
| 925 | | ) |
|---|
| 926 | | |
|---|
| 927 | | ncomplete += completed |
|---|
| 928 | | if not completed: |
|---|
| 929 | | time.sleep(0.1) |
|---|
| 930 | | else: |
|---|
| 931 | | print 'Completed: %d, ncomplete %d' % (completed, ncomplete) |
|---|
| 932 | | |
|---|
| 933 | | assert len(results) == ncomplete, "len(results):%d != ncomplete:%d" % ( |
|---|
| 934 | | len(results), ncomplete |
|---|
| 935 | | ) |
|---|
| 936 | | assert not (set(results) - set([i for i in xrange(n_jobs)]) - |
|---|
| 937 | | set(canceled_markers) |
|---|
| 938 | | ) |
|---|
| 939 | | |
|---|
| 940 | | team.set_team_size(0) |
|---|
| 941 | | |
|---|
| 942 | | |
|---|
| 943 | | def ignore(): |
|---|
| 944 | | """A team of worker threads. |
|---|
| 945 | | |
|---|
| 946 | | Instance in your main thread of control and call queue_work to arrange for |
|---|
| 947 | | a callable to be invoked in one of the threads managed by the thread team |
|---|
| 948 | | instance. |
|---|
| 949 | | |
|---|
| 950 | | The public methods of this class should *not* be used concurrently from |
|---|
| 951 | | multiple threads. You may safely transfer owner ship of a threadteam |
|---|
| 952 | | instance from one thread to another, provided that none of the threads |
|---|
| 953 | | that may own the team are themselves team members. |
|---|
| 954 | | |
|---|
| 955 | | The issued_delegates, results and members dictionaries |
|---|
| 956 | | ............................................................... |
|---|
| 957 | | |
|---|
| 958 | | Each thread in the team gets an entry in `members`, `issued_delegates` and |
|---|
| 959 | | `results`. These entries are created as the thread is being created and |
|---|
| 960 | | *before* it is running. These entries are not removed until the associated |
|---|
| 961 | | team thread has terminated. `members` contains a reference to the thread |
|---|
| 962 | | instance and a condition variable that is used to gate access to the |
|---|
| 963 | | threads private - 1 deep - work queue. Items are 'put' to the first |
|---|
| 964 | | available worker in `issue_delegates` by acquiring the associated condition |
|---|
| 965 | | variable in `members` and then setting the application callable as the |
|---|
| 966 | | value for that workers key in `issued_delegates`. The worker 'gets' the |
|---|
| 967 | | work by first acquiring the condition variable in members and then taking |
|---|
| 968 | | the callable from issued_delegates. Before releases the condition variable |
|---|
| 969 | | - having obtained the callable - it first sets its entry in |
|---|
| 970 | | `issued_delegates` to None. |
|---|
| 971 | | |
|---|
| 972 | | `results` is only accessed from the master thread. The master |
|---|
| 973 | | thread is the thread that owns the threadteam instance. |
|---|
| 974 | | |
|---|
| 975 | | The prune of dead keys from members and issued_delegates is *not* guarded. |
|---|
| 976 | | This is deliberate. The implementation *NEVER* sets a value in members or |
|---|
| 977 | | issued_delegates without taking out the condition variable. The worst that |
|---|
| 978 | | could happen when set_team_size causes the dictionaries to resize is that a |
|---|
| 979 | | worker may get None from its issued_delegates entry when there is in fact a |
|---|
| 980 | | callable ready to fire. We do not resize often and if we did trigger this |
|---|
| 981 | | condition it is completely benign, the worker loops *until* it gets a |
|---|
| 982 | | value, and as soon as the dictionary settles down things proceed as normal. |
|---|
| 983 | | |
|---|
| 984 | | Our keys are *always* strings we are thread safe. Even if the keys were |
|---|
| 985 | | some custom instance the chances of corruption are ZERO and the chances of |
|---|
| 986 | | an erroneously failed lookup are very close to zero. Even if we get an |
|---|
| 987 | | erroneously failed lookup the worker will simply get the value the next |
|---|
| 988 | | time around its 'get' loop. |
|---|
| 989 | | |
|---|
| 990 | | We *NEVER* do a read/modify/write on the values. |
|---|
| 991 | | We *NEVER* remove keys which correspond to live threads. |
|---|
| 992 | | |
|---|
| 993 | | http://mail.python.org/pipermail/python-list/2007-June/443324.html |
|---|
| 994 | | http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm |
|---|
| 995 | | http://mail.python.org/pipermail/python-dev/2006-November/069981.html |
|---|
| 996 | | |
|---|
| 997 | | """ |
|---|
| 998 | | |
|---|
| 999 | | if __name__=='__main__': |
|---|
| 1000 | | logging.basicConfig(format='%(message)s :: %(asctime)s') |
|---|
| 1001 | | log.setLevel(logging.DEBUG) |
|---|
| 1002 | | tests=[ |
|---|
| 1003 | | test_simple_daemon_threads, |
|---|
| 1004 | | test_simple_daemon_threads_get_put, |
|---|
| 1005 | | test_terminal_value, |
|---|
| 1006 | | test_threadteam_basicwork_loop, |
|---|
| 1007 | | test_threadteam_hot_resize |
|---|
| 1008 | | ] |
|---|
| 1009 | | passed=[] |
|---|
| 1010 | | try: |
|---|
| 1011 | | for t in tests: |
|---|
| 1012 | | t() |
|---|
| 1013 | | passed.append(t.__name__) |
|---|
| 1014 | | except: |
|---|
| 1015 | | print exc_string() |
|---|
| 1016 | | print 'PASSED:', ', '.join(passed) |
|---|
| 1017 | | if len(passed) < len(tests): |
|---|
| 1018 | | print 'FAILED:', tests[len(passed)].__name__ |
|---|
| 1019 | | if len(passed) < len(tests) - 1: |
|---|
| 1020 | | print 'NOT TESTED:', ', '.join([t.__name__ |
|---|
| 1021 | | for t in tests[len(passed)+1:]]) |
|---|
| 1022 | | sys.exit(-1) |
|---|
| 1023 | | else: |
|---|
| 1024 | | print 'PASSED:', ', '.join(passed) |
|---|
| 1025 | | |
|---|
| | 718 | #eof |
|---|