404

[ Avaa Bypassed ]




Upload:

Command:

botdev@18.222.30.84: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.python.threadpool}
"""

from __future__ import division, absolute_import

import pickle, time, weakref, gc, threading

from twisted.python.compat import range

from twisted.trial import unittest
from twisted.python import threadpool, threadable, failure, context
from twisted._threads import Team, createMemoryWorker

#
# See the end of this module for the remainder of the imports.
#

class Synchronization(object):
    failures = 0

    def __init__(self, N, waiting):
        self.N = N
        self.waiting = waiting
        self.lock = threading.Lock()
        self.runs = []

    def run(self):
        # This is the testy part: this is supposed to be invoked
        # serially from multiple threads.  If that is actually the
        # case, we will never fail to acquire this lock.  If it is
        # *not* the case, we might get here while someone else is
        # holding the lock.
        if self.lock.acquire(False):
            if not len(self.runs) % 5:
                time.sleep(0.0002) # Constant selected based on
                                   # empirical data to maximize the
                                   # chance of a quick failure if this
                                   # code is broken.
            self.lock.release()
        else:
            self.failures += 1

        # This is just the only way I can think of to wake up the test
        # method.  It doesn't actually have anything to do with the
        # test.
        self.lock.acquire()
        self.runs.append(None)
        if len(self.runs) == self.N:
            self.waiting.release()
        self.lock.release()

    synchronized = ["run"]
threadable.synchronize(Synchronization)



class ThreadPoolTests(unittest.SynchronousTestCase):
    """
    Test threadpools.
    """

    def getTimeout(self):
        """
        Return number of seconds to wait before giving up.
        """
        return 5 # Really should be order of magnitude less


    def _waitForLock(self, lock):
        items = range(1000000)
        for i in items:
            if lock.acquire(False):
                break
            time.sleep(1e-5)
        else:
            self.fail("A long time passed without succeeding")


    def test_attributes(self):
        """
        L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to
        L{ThreadPool.__init__}.
        """
        pool = threadpool.ThreadPool(12, 22)
        self.assertEqual(pool.min, 12)
        self.assertEqual(pool.max, 22)


    def test_start(self):
        """
        L{ThreadPool.start} creates the minimum number of threads specified.
        """
        pool = threadpool.ThreadPool(0, 5)
        pool.start()
        self.addCleanup(pool.stop)
        self.assertEqual(len(pool.threads), 0)

        pool = threadpool.ThreadPool(3, 10)
        self.assertEqual(len(pool.threads), 0)
        pool.start()
        self.addCleanup(pool.stop)
        self.assertEqual(len(pool.threads), 3)


    def test_adjustingWhenPoolStopped(self):
        """
        L{ThreadPool.adjustPoolsize} only modifies the pool size and does not
        start new workers while the pool is not running.
        """
        pool = threadpool.ThreadPool(0, 5)
        pool.start()
        pool.stop()
        pool.adjustPoolsize(2)
        self.assertEqual(len(pool.threads), 0)


    def test_threadCreationArguments(self):
        """
        Test that creating threads in the threadpool with application-level
        objects as arguments doesn't results in those objects never being
        freed, with the thread maintaining a reference to them as long as it
        exists.
        """
        tp = threadpool.ThreadPool(0, 1)
        tp.start()
        self.addCleanup(tp.stop)

        # Sanity check - no threads should have been started yet.
        self.assertEqual(tp.threads, [])

        # Here's our function
        def worker(arg):
            pass
        # weakref needs an object subclass
        class Dumb(object):
            pass
        # And here's the unique object
        unique = Dumb()

        workerRef = weakref.ref(worker)
        uniqueRef = weakref.ref(unique)

        # Put some work in
        tp.callInThread(worker, unique)

        # Add an event to wait completion
        event = threading.Event()
        tp.callInThread(event.set)
        event.wait(self.getTimeout())

        del worker
        del unique
        gc.collect()
        self.assertIsNone(uniqueRef())
        self.assertIsNone(workerRef())


    def test_threadCreationArgumentsCallInThreadWithCallback(self):
        """
        As C{test_threadCreationArguments} above, but for
        callInThreadWithCallback.
        """

        tp = threadpool.ThreadPool(0, 1)
        tp.start()
        self.addCleanup(tp.stop)

        # Sanity check - no threads should have been started yet.
        self.assertEqual(tp.threads, [])

        # this holds references obtained in onResult
        refdict = {} # name -> ref value

        onResultWait = threading.Event()
        onResultDone = threading.Event()

        resultRef = []

        # result callback
        def onResult(success, result):
            onResultWait.wait(self.getTimeout())
            refdict['workerRef'] = workerRef()
            refdict['uniqueRef'] = uniqueRef()
            onResultDone.set()
            resultRef.append(weakref.ref(result))

        # Here's our function
        def worker(arg, test):
            return Dumb()

        # weakref needs an object subclass
        class Dumb(object):
            pass

        # And here's the unique object
        unique = Dumb()

        onResultRef = weakref.ref(onResult)
        workerRef = weakref.ref(worker)
        uniqueRef = weakref.ref(unique)

        # Put some work in
        tp.callInThreadWithCallback(onResult, worker, unique, test=unique)

        del worker
        del unique

        # let onResult collect the refs
        onResultWait.set()
        # wait for onResult
        onResultDone.wait(self.getTimeout())
        gc.collect()

        self.assertIsNone(uniqueRef())
        self.assertIsNone(workerRef())

        # XXX There's a race right here - has onResult in the worker thread
        # returned and the locals in _worker holding it and the result been
        # deleted yet?

        del onResult
        gc.collect()
        self.assertIsNone(onResultRef())
        self.assertIsNone(resultRef[0]())


    def test_persistence(self):
        """
        Threadpools can be pickled and unpickled, which should preserve the
        number of threads and other parameters.
        """
        pool = threadpool.ThreadPool(7, 20)

        self.assertEqual(pool.min, 7)
        self.assertEqual(pool.max, 20)

        # check that unpickled threadpool has same number of threads
        copy = pickle.loads(pickle.dumps(pool))

        self.assertEqual(copy.min, 7)
        self.assertEqual(copy.max, 20)


    def _threadpoolTest(self, method):
        """
        Test synchronization of calls made with C{method}, which should be
        one of the mechanisms of the threadpool to execute work in threads.
        """
        # This is a schizophrenic test: it seems to be trying to test
        # both the callInThread()/dispatch() behavior of the ThreadPool as well
        # as the serialization behavior of threadable.synchronize().  It
        # would probably make more sense as two much simpler tests.
        N = 10

        tp = threadpool.ThreadPool()
        tp.start()
        self.addCleanup(tp.stop)

        waiting = threading.Lock()
        waiting.acquire()
        actor = Synchronization(N, waiting)

        for i in range(N):
            method(tp, actor)

        self._waitForLock(waiting)

        self.assertFalse(actor.failures, "run() re-entered %d times" %
                                    (actor.failures,))


    def test_callInThread(self):
        """
        Call C{_threadpoolTest} with C{callInThread}.
        """
        return self._threadpoolTest(
            lambda tp, actor: tp.callInThread(actor.run))


    def test_callInThreadException(self):
        """
        L{ThreadPool.callInThread} logs exceptions raised by the callable it
        is passed.
        """
        class NewError(Exception):
            pass

        def raiseError():
            raise NewError()

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThread(raiseError)
        tp.start()
        tp.stop()

        errors = self.flushLoggedErrors(NewError)
        self.assertEqual(len(errors), 1)


    def test_callInThreadWithCallback(self):
        """
        L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
        two-tuple of C{(True, result)} where C{result} is the value returned
        by the callable supplied.
        """
        waiter = threading.Lock()
        waiter.acquire()

        results = []

        def onResult(success, result):
            waiter.release()
            results.append(success)
            results.append(result)

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, lambda: "test")
        tp.start()

        try:
            self._waitForLock(waiter)
        finally:
            tp.stop()

        self.assertTrue(results[0])
        self.assertEqual(results[1], "test")


    def test_callInThreadWithCallbackExceptionInCallback(self):
        """
        L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
        two-tuple of C{(False, failure)} where C{failure} represents the
        exception raised by the callable supplied.
        """
        class NewError(Exception):
            pass

        def raiseError():
            raise NewError()

        waiter = threading.Lock()
        waiter.acquire()

        results = []

        def onResult(success, result):
            waiter.release()
            results.append(success)
            results.append(result)

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, raiseError)
        tp.start()

        try:
            self._waitForLock(waiter)
        finally:
            tp.stop()

        self.assertFalse(results[0])
        self.assertIsInstance(results[1], failure.Failure)
        self.assertTrue(issubclass(results[1].type, NewError))


    def test_callInThreadWithCallbackExceptionInOnResult(self):
        """
        L{ThreadPool.callInThreadWithCallback} logs the exception raised by
        C{onResult}.
        """
        class NewError(Exception):
            pass

        waiter = threading.Lock()
        waiter.acquire()

        results = []

        def onResult(success, result):
            results.append(success)
            results.append(result)
            raise NewError()

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, lambda : None)
        tp.callInThread(waiter.release)
        tp.start()

        try:
            self._waitForLock(waiter)
        finally:
            tp.stop()

        errors = self.flushLoggedErrors(NewError)
        self.assertEqual(len(errors), 1)

        self.assertTrue(results[0])
        self.assertIsNone(results[1])


    def test_callbackThread(self):
        """
        L{ThreadPool.callInThreadWithCallback} calls the function it is
        given and the C{onResult} callback in the same thread.
        """
        threadIds = []

        event = threading.Event()

        def onResult(success, result):
            threadIds.append(threading.currentThread().ident)
            event.set()

        def func():
            threadIds.append(threading.currentThread().ident)

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, func)
        tp.start()
        self.addCleanup(tp.stop)

        event.wait(self.getTimeout())
        self.assertEqual(len(threadIds), 2)
        self.assertEqual(threadIds[0], threadIds[1])


    def test_callbackContext(self):
        """
        The context L{ThreadPool.callInThreadWithCallback} is invoked in is
        shared by the context the callable and C{onResult} callback are
        invoked in.
        """
        myctx = context.theContextTracker.currentContext().contexts[-1]
        myctx['testing'] = 'this must be present'

        contexts = []

        event = threading.Event()

        def onResult(success, result):
            ctx = context.theContextTracker.currentContext().contexts[-1]
            contexts.append(ctx)
            event.set()

        def func():
            ctx = context.theContextTracker.currentContext().contexts[-1]
            contexts.append(ctx)

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, func)
        tp.start()
        self.addCleanup(tp.stop)

        event.wait(self.getTimeout())

        self.assertEqual(len(contexts), 2)
        self.assertEqual(myctx, contexts[0])
        self.assertEqual(myctx, contexts[1])


    def test_existingWork(self):
        """
        Work added to the threadpool before its start should be executed once
        the threadpool is started: this is ensured by trying to release a lock
        previously acquired.
        """
        waiter = threading.Lock()
        waiter.acquire()

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThread(waiter.release) # before start()
        tp.start()

        try:
            self._waitForLock(waiter)
        finally:
            tp.stop()


    def test_workerStateTransition(self):
        """
        As the worker receives and completes work, it transitions between
        the working and waiting states.
        """
        pool = threadpool.ThreadPool(0, 1)
        pool.start()
        self.addCleanup(pool.stop)

        # sanity check
        self.assertEqual(pool.workers, 0)
        self.assertEqual(len(pool.waiters), 0)
        self.assertEqual(len(pool.working), 0)

        # fire up a worker and give it some 'work'
        threadWorking = threading.Event()
        threadFinish = threading.Event()

        def _thread():
            threadWorking.set()
            threadFinish.wait(10)

        pool.callInThread(_thread)
        threadWorking.wait(10)
        self.assertEqual(pool.workers, 1)
        self.assertEqual(len(pool.waiters), 0)
        self.assertEqual(len(pool.working), 1)

        # finish work, and spin until state changes
        threadFinish.set()
        while not len(pool.waiters):
            time.sleep(0.0005)

        # make sure state changed correctly
        self.assertEqual(len(pool.waiters), 1)
        self.assertEqual(len(pool.working), 0)



class RaceConditionTests(unittest.SynchronousTestCase):

    def setUp(self):
        self.threadpool = threadpool.ThreadPool(0, 10)
        self.event = threading.Event()
        self.threadpool.start()
        def done():
            self.threadpool.stop()
            del self.threadpool
        self.addCleanup(done)


    def getTimeout(self):
        """
        A reasonable number of seconds to time out.
        """
        return 5


    def test_synchronization(self):
        """
        If multiple threads are waiting on an event (via blocking on something
        in a callable passed to L{threadpool.ThreadPool.callInThread}), and
        there is spare capacity in the threadpool, sending another callable
        which will cause those to un-block to
        L{threadpool.ThreadPool.callInThread} will reliably run that callable
        and un-block the blocked threads promptly.

        @note: This is not really a unit test, it is a stress-test.  You may
            need to run it with C{trial -u} to fail reliably if there is a
            problem.  It is very hard to regression-test for this particular
            bug - one where the thread pool may consider itself as having
            "enough capacity" when it really needs to spin up a new thread if
            it possibly can - in a deterministic way, since the bug can only be
            provoked by subtle race conditions.
        """
        timeout = self.getTimeout()
        self.threadpool.callInThread(self.event.set)
        self.event.wait(timeout)
        self.event.clear()
        for i in range(3):
            self.threadpool.callInThread(self.event.wait)
        self.threadpool.callInThread(self.event.set)
        self.event.wait(timeout)
        if not self.event.isSet():
            self.event.set()
            self.fail(
                "'set' did not run in thread; timed out waiting on 'wait'."
            )


    def test_singleThread(self):
        """
        The submission of a new job to a thread pool in response to the
        C{onResult} callback does not cause a new thread to be added to the
        thread pool.

        This requires that the thread which calls C{onResult} to have first
        marked itself as available so that when the new job is queued, that
        thread may be considered to run it.  This is desirable so that when
        only N jobs are ever being executed in the thread pool at once only
        N threads will ever be created.
        """
        # Ensure no threads running
        self.assertEqual(self.threadpool.workers, 0)

        event = threading.Event()
        event.clear()

        def onResult(success, counter):
            event.set()

        for i in range(10):
            self.threadpool.callInThreadWithCallback(
                onResult, lambda: None)
            event.wait(10)
            event.clear()

        self.assertEqual(self.threadpool.workers, 1)



class MemoryPool(threadpool.ThreadPool):
    """
    A deterministic threadpool that uses in-memory data structures to queue
    work rather than threads to execute work.
    """

    def __init__(self, coordinator, failTest, newWorker, *args, **kwargs):
        """
        Initialize this L{MemoryPool} with a test case.

        @param coordinator: a worker used to coordinate work in the L{Team}
            underlying this threadpool.
        @type coordinator: L{twisted._threads.IExclusiveWorker}

        @param failTest: A 1-argument callable taking an exception and raising
            a test-failure exception.
        @type failTest: 1-argument callable taking (L{Failure}) and raising
            L{unittest.FailTest}.

        @param newWorker: a 0-argument callable that produces a new
            L{twisted._threads.IWorker} provider on each invocation.
        @type newWorker: 0-argument callable returning
            L{twisted._threads.IWorker}.
        """
        self._coordinator = coordinator
        self._failTest = failTest
        self._newWorker = newWorker
        threadpool.ThreadPool.__init__(self, *args, **kwargs)


    def _pool(self, currentLimit, threadFactory):
        """
        Override testing hook to create a deterministic threadpool.

        @param currentLimit: A 1-argument callable which returns the current
            threadpool size limit.

        @param threadFactory: ignored in this invocation; a 0-argument callable
            that would produce a thread.

        @return: a L{Team} backed by the coordinator and worker passed to
            L{MemoryPool.__init__}.
        """
        def respectLimit():
            # The expression in this method copied and pasted from
            # twisted.threads._pool, which is unfortunately bound up
            # with lots of actual-threading stuff.
            stats = team.statistics()
            if (stats.busyWorkerCount + stats.idleWorkerCount
                >= currentLimit()):
                return None
            return self._newWorker()
        team = Team(coordinator=self._coordinator,
                    createWorker=respectLimit,
                    logException=self._failTest)
        return team



class PoolHelper(object):
    """
    A L{PoolHelper} constructs a L{threadpool.ThreadPool} that doesn't actually
    use threads, by using the internal interfaces in L{twisted._threads}.

    @ivar performCoordination: a 0-argument callable that will perform one unit
        of "coordination" - work involved in delegating work to other threads -
        and return L{True} if it did any work, L{False} otherwise.

    @ivar workers: the workers which represent the threads within the pool -
        the workers other than the coordinator.
    @type workers: L{list} of 2-tuple of (L{IWorker}, C{workPerformer}) where
        C{workPerformer} is a 0-argument callable like C{performCoordination}.

    @ivar threadpool: a modified L{threadpool.ThreadPool} to test.
    @type threadpool: L{MemoryPool}
    """

    def __init__(self, testCase, *args, **kwargs):
        """
        Create a L{PoolHelper}.

        @param testCase: a test case attached to this helper.

        @type args: The arguments passed to a L{threadpool.ThreadPool}.

        @type kwargs: The arguments passed to a L{threadpool.ThreadPool}
        """
        coordinator, self.performCoordination = createMemoryWorker()
        self.workers = []
        def newWorker():
            self.workers.append(createMemoryWorker())
            return self.workers[-1][0]
        self.threadpool = MemoryPool(coordinator, testCase.fail, newWorker,
                                     *args, **kwargs)


    def performAllCoordination(self):
        """
        Perform all currently scheduled "coordination", which is the work
        involved in delegating work to other threads.
        """
        while self.performCoordination():
            pass



class MemoryBackedTests(unittest.SynchronousTestCase):
    """
    Tests using L{PoolHelper} to deterministically test properties of the
    threadpool implementation.
    """

    def test_workBeforeStarting(self):
        """
        If a threadpool is told to do work before starting, then upon starting
        up, it will start enough workers to handle all of the enqueued work
        that it's been given.
        """
        helper = PoolHelper(self, 0, 10)
        n = 5
        for x in range(n):
            helper.threadpool.callInThread(lambda: None)
        helper.performAllCoordination()
        self.assertEqual(helper.workers, [])
        helper.threadpool.start()
        helper.performAllCoordination()
        self.assertEqual(len(helper.workers), n)


    def test_tooMuchWorkBeforeStarting(self):
        """
        If the amount of work before starting exceeds the maximum number of
        threads allowed to the threadpool, only the maximum count will be
        started.
        """
        helper = PoolHelper(self, 0, 10)
        n = 50
        for x in range(n):
            helper.threadpool.callInThread(lambda: None)
        helper.performAllCoordination()
        self.assertEqual(helper.workers, [])
        helper.threadpool.start()
        helper.performAllCoordination()
        self.assertEqual(len(helper.workers), helper.threadpool.max)


Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 103 B 0644
cert.pem.no_trailing_newline File 1.38 KB 0644
crash_test_dummy.py File 543 B 0644
iosim.py File 17.3 KB 0644
key.pem.no_trailing_newline File 1.67 KB 0644
mock_win32process.py File 1.46 KB 0644
myrebuilder1.py File 158 B 0644
myrebuilder2.py File 158 B 0644
plugin_basic.py File 943 B 0644
plugin_extra1.py File 407 B 0644
plugin_extra2.py File 579 B 0644
process_cmdline.py File 162 B 0644
process_echoer.py File 214 B 0644
process_fds.py File 945 B 0644
process_getargv.py File 283 B 0644
process_getenv.py File 268 B 0644
process_linger.py File 286 B 0644
process_reader.py File 188 B 0644
process_signal.py File 214 B 0644
process_stdinreader.py File 857 B 0644
process_tester.py File 1.01 KB 0644
process_tty.py File 130 B 0644
process_twisted.py File 1.18 KB 0644
proto_helpers.py File 26.33 KB 0644
raiser.c File 93.05 KB 0644
raiser.cpython-36m-x86_64-linux-gnu.so File 19.16 KB 0644
raiser.pyx File 466 B 0644
reflect_helper_IE.py File 61 B 0644
reflect_helper_VE.py File 82 B 0644
reflect_helper_ZDE.py File 47 B 0644
server.pem File 4.34 KB 0644
ssl_helpers.py File 1.01 KB 0644
stdio_test_consumer.py File 1.19 KB 0644
stdio_test_halfclose.py File 1.89 KB 0644
stdio_test_hostpeer.py File 1021 B 0644
stdio_test_lastwrite.py File 1.18 KB 0644
stdio_test_loseconn.py File 1.51 KB 0644
stdio_test_producer.py File 1.47 KB 0644
stdio_test_write.py File 923 B 0644
stdio_test_writeseq.py File 915 B 0644
test_abstract.py File 3.42 KB 0644
test_adbapi.py File 25.53 KB 0644
test_amp.py File 107.96 KB 0644
test_application.py File 32.05 KB 0644
test_compat.py File 27.32 KB 0644
test_context.py File 1.48 KB 0644
test_cooperator.py File 20.96 KB 0644
test_defer.py File 100.93 KB 0644
test_defgen.py File 10.45 KB 0644
test_dict.py File 1.41 KB 0644
test_dirdbm.py File 6.76 KB 0644
test_error.py File 8.39 KB 0644
test_factories.py File 4.53 KB 0644
test_failure.py File 29.92 KB 0644
test_fdesc.py File 7.2 KB 0644
test_finger.py File 1.95 KB 0644
test_formmethod.py File 3.56 KB 0644
test_ftp.py File 127.27 KB 0644
test_ftp_options.py File 2.62 KB 0644
test_htb.py File 3.12 KB 0644
test_ident.py File 6.85 KB 0644
test_internet.py File 45.33 KB 0644
test_iosim.py File 8.49 KB 0644
test_iutils.py File 13.13 KB 0644
test_lockfile.py File 15.14 KB 0644
test_log.py File 35.48 KB 0644
test_logfile.py File 17.8 KB 0644
test_loopback.py File 14.15 KB 0644
test_main.py File 2.44 KB 0644
test_memcache.py File 24.55 KB 0644
test_modules.py File 17.47 KB 0644
test_monkey.py File 5.5 KB 0644
test_nooldstyle.py File 5.82 KB 0644
test_paths.py File 72.61 KB 0644
test_pcp.py File 12.26 KB 0644
test_persisted.py File 14.28 KB 0644
test_plugin.py File 25.5 KB 0644
test_policies.py File 32.04 KB 0644
test_postfix.py File 3.53 KB 0644
test_process.py File 84.1 KB 0644
test_protocols.py File 7.28 KB 0644
test_randbytes.py File 3.28 KB 0644
test_rebuild.py File 8.3 KB 0644
test_reflect.py File 25.47 KB 0644
test_roots.py File 1.77 KB 0644
test_shortcut.py File 1.89 KB 0644
test_sip.py File 24.69 KB 0644
test_sob.py File 5.5 KB 0644
test_socks.py File 17.32 KB 0644
test_ssl.py File 23.29 KB 0644
test_sslverify.py File 104.28 KB 0644
test_stateful.py File 1.97 KB 0644
test_stdio.py File 12.85 KB 0644
test_strerror.py File 5.06 KB 0644
test_stringtransport.py File 12.95 KB 0644
test_strports.py File 1.75 KB 0644
test_task.py File 38.4 KB 0644
test_tcp.py File 64.07 KB 0644
test_tcp_internals.py File 8.54 KB 0644
test_text.py File 6.3 KB 0644
test_threadable.py File 3.65 KB 0644
test_threadpool.py File 22.47 KB 0644
test_threads.py File 12.96 KB 0644
test_tpfile.py File 1.56 KB 0644
test_twistd.py File 61.05 KB 0644
test_twisted.py File 18.42 KB 0644
test_udp.py File 24.1 KB 0644
test_unix.py File 14.8 KB 0644
test_usage.py File 23.09 KB 0644
testutils.py File 5.19 KB 0644