404

[ Avaa Bypassed ]




Upload:

Command:

botdev@18.218.137.145: ~ $
# -*- test-case-name: twisted.test.test_threadpool -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
twisted.python.threadpool: a pool of threads to which we dispatch tasks.

In most cases you can just use C{reactor.callInThread} and friends
instead of creating a thread pool directly.
"""

from __future__ import division, absolute_import

import threading

from twisted._threads import pool as _pool
from twisted.python import log, context
from twisted.python.failure import Failure
from twisted.python._oldstyle import _oldStyle


WorkerStop = object()



@_oldStyle
class ThreadPool:
    """
    This class (hopefully) generalizes the functionality of a pool of threads
    to which work can be dispatched.

    L{callInThread} and L{stop} should only be called from a single thread.

    @ivar started: Whether or not the thread pool is currently running.
    @type started: L{bool}

    @ivar threads: List of workers currently running in this thread pool.
    @type threads: L{list}

    @ivar _pool: A hook for testing.
    @type _pool: callable compatible with L{_pool}
    """
    min = 5
    max = 20
    joined = False
    started = False
    workers = 0
    name = None

    threadFactory = threading.Thread
    currentThread = staticmethod(threading.currentThread)
    _pool = staticmethod(_pool)

    def __init__(self, minthreads=5, maxthreads=20, name=None):
        """
        Create a new threadpool.

        @param minthreads: minimum number of threads in the pool
        @type minthreads: L{int}

        @param maxthreads: maximum number of threads in the pool
        @type maxthreads: L{int}

        @param name: The name to give this threadpool; visible in log messages.
        @type name: native L{str}
        """
        assert minthreads >= 0, 'minimum is negative'
        assert minthreads <= maxthreads, 'minimum is greater than maximum'
        self.min = minthreads
        self.max = maxthreads
        self.name = name
        self.threads = []

        def trackingThreadFactory(*a, **kw):
            thread = self.threadFactory(*a, name=self._generateName(), **kw)
            self.threads.append(thread)
            return thread

        def currentLimit():
            if not self.started:
                return 0
            return self.max

        self._team = self._pool(currentLimit, trackingThreadFactory)


    @property
    def workers(self):
        """
        For legacy compatibility purposes, return a total number of workers.

        @return: the current number of workers, both idle and busy (but not
            those that have been quit by L{ThreadPool.adjustPoolsize})
        @rtype: L{int}
        """
        stats = self._team.statistics()
        return stats.idleWorkerCount + stats.busyWorkerCount


    @property
    def working(self):
        """
        For legacy compatibility purposes, return the number of busy workers as
        expressed by a list the length of that number.

        @return: the number of workers currently processing a work item.
        @rtype: L{list} of L{None}
        """
        return [None] * self._team.statistics().busyWorkerCount


    @property
    def waiters(self):
        """
        For legacy compatibility purposes, return the number of idle workers as
        expressed by a list the length of that number.

        @return: the number of workers currently alive (with an allocated
            thread) but waiting for new work.
        @rtype: L{list} of L{None}
        """
        return [None] * self._team.statistics().idleWorkerCount


    @property
    def _queue(self):
        """
        For legacy compatibility purposes, return an object with a C{qsize}
        method that indicates the amount of work not yet allocated to a worker.

        @return: an object with a C{qsize} method.
        """
        class NotAQueue(object):
            def qsize(q):
                """
                Pretend to be a Python threading Queue and return the
                number of as-yet-unconsumed tasks.

                @return: the amount of backlogged work not yet dispatched to a
                    worker.
                @rtype: L{int}
                """
                return self._team.statistics().backloggedWorkCount
        return NotAQueue()

    q = _queue                  # Yes, twistedchecker, I want a single-letter
                                # attribute name.


    def start(self):
        """
        Start the threadpool.
        """
        self.joined = False
        self.started = True
        # Start some threads.
        self.adjustPoolsize()
        backlog = self._team.statistics().backloggedWorkCount
        if backlog:
            self._team.grow(backlog)


    def startAWorker(self):
        """
        Increase the number of available workers for the thread pool by 1, up
        to the maximum allowed by L{ThreadPool.max}.
        """
        self._team.grow(1)


    def _generateName(self):
        """
        Generate a name for a new pool thread.

        @return: A distinctive name for the thread.
        @rtype: native L{str}
        """
        return "PoolThread-%s-%s" % (self.name or id(self), self.workers)


    def stopAWorker(self):
        """
        Decrease the number of available workers by 1, by quitting one as soon
        as it's idle.
        """
        self._team.shrink(1)


    def __setstate__(self, state):
        setattr(self, "__dict__", state)
        ThreadPool.__init__(self, self.min, self.max)


    def __getstate__(self):
        state = {}
        state['min'] = self.min
        state['max'] = self.max
        return state


    def callInThread(self, func, *args, **kw):
        """
        Call a callable object in a separate thread.

        @param func: callable object to be called in separate thread

        @param args: positional arguments to be passed to C{func}

        @param kw: keyword args to be passed to C{func}
        """
        self.callInThreadWithCallback(None, func, *args, **kw)


    def callInThreadWithCallback(self, onResult, func, *args, **kw):
        """
        Call a callable object in a separate thread and call C{onResult} with
        the return value, or a L{twisted.python.failure.Failure} if the
        callable raises an exception.

        The callable is allowed to block, but the C{onResult} function must not
        block and should perform as little work as possible.

        A typical action for C{onResult} for a threadpool used with a Twisted
        reactor would be to schedule a L{twisted.internet.defer.Deferred} to
        fire in the main reactor thread using C{.callFromThread}.  Note that
        C{onResult} is called inside the separate thread, not inside the
        reactor thread.

        @param onResult: a callable with the signature C{(success, result)}.
            If the callable returns normally, C{onResult} is called with
            C{(True, result)} where C{result} is the return value of the
            callable.  If the callable throws an exception, C{onResult} is
            called with C{(False, failure)}.

            Optionally, C{onResult} may be L{None}, in which case it is not
            called at all.

        @param func: callable object to be called in separate thread

        @param args: positional arguments to be passed to C{func}

        @param kw: keyword arguments to be passed to C{func}
        """
        if self.joined:
            return
        ctx = context.theContextTracker.currentContext().contexts[-1]

        def inContext():
            try:
                result = inContext.theWork()
                ok = True
            except:
                result = Failure()
                ok = False

            inContext.theWork = None
            if inContext.onResult is not None:
                inContext.onResult(ok, result)
                inContext.onResult = None
            elif not ok:
                log.err(result)

        # Avoid closing over func, ctx, args, kw so that we can carefully
        # manage their lifecycle.  See
        # test_threadCreationArgumentsCallInThreadWithCallback.
        inContext.theWork = lambda: context.call(ctx, func, *args, **kw)
        inContext.onResult = onResult

        self._team.do(inContext)


    def stop(self):
        """
        Shutdown the threads in the threadpool.
        """
        self.joined = True
        self.started = False
        self._team.quit()
        for thread in self.threads:
            thread.join()


    def adjustPoolsize(self, minthreads=None, maxthreads=None):
        """
        Adjust the number of available threads by setting C{min} and C{max} to
        new values.

        @param minthreads: The new value for L{ThreadPool.min}.

        @param maxthreads: The new value for L{ThreadPool.max}.
        """
        if minthreads is None:
            minthreads = self.min
        if maxthreads is None:
            maxthreads = self.max

        assert minthreads >= 0, 'minimum is negative'
        assert minthreads <= maxthreads, 'minimum is greater than maximum'

        self.min = minthreads
        self.max = maxthreads
        if not self.started:
            return

        # Kill of some threads if we have too many.
        if self.workers > self.max:
            self._team.shrink(self.workers - self.max)
        # Start some threads if we have too few.
        if self.workers < self.min:
            self._team.grow(self.min - self.workers)


    def dumpStats(self):
        """
        Dump some plain-text informational messages to the log about the state
        of this L{ThreadPool}.
        """
        log.msg('waiters: %s' % (self.waiters,))
        log.msg('workers: %s' % (self.working,))
        log.msg('total: %s'   % (self.threads,))

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
_pydoctortemplates Folder 0755
test Folder 0755
__init__.py File 674 B 0644
_appdirs.py File 788 B 0644
_inotify.py File 3.37 KB 0644
_oldstyle.py File 2.53 KB 0644
_release.py File 18.03 KB 0644
_sendmsg.c File 15.42 KB 0644
_setup.py File 12.6 KB 0644
_shellcomp.py File 23.76 KB 0644
_textattributes.py File 8.87 KB 0644
_tzhelper.py File 3.12 KB 0644
_url.py File 253 B 0644
compat.py File 21.91 KB 0644
components.py File 13.96 KB 0644
constants.py File 544 B 0644
context.py File 3.93 KB 0644
deprecate.py File 26.15 KB 0644
failure.py File 23.38 KB 0644
fakepwd.py File 5.99 KB 0644
filepath.py File 57.51 KB 0644
formmethod.py File 11.19 KB 0644
htmlizer.py File 3.27 KB 0644
lockfile.py File 7.54 KB 0644
log.py File 21.95 KB 0644
logfile.py File 9.85 KB 0644
modules.py File 26.5 KB 0644
monkey.py File 2.17 KB 0644
procutils.py File 1.39 KB 0644
randbytes.py File 3.87 KB 0644
rebuild.py File 9.04 KB 0644
reflect.py File 19.02 KB 0644
release.py File 1.16 KB 0644
roots.py File 7.23 KB 0644
runtime.py File 6.13 KB 0644
sendmsg.py File 3.34 KB 0644
shortcut.py File 2.2 KB 0644
syslog.py File 3.64 KB 0644
systemd.py File 2.77 KB 0644
text.py File 5.35 KB 0644
threadable.py File 3.22 KB 0644
threadpool.py File 9.61 KB 0644
twisted-completion.zsh File 1.33 KB 0644
url.py File 244 B 0644
urlpath.py File 8.87 KB 0644
usage.py File 34.19 KB 0644
util.py File 27.29 KB 0644
versions.py File 322 B 0644
win32.py File 5.42 KB 0644
zippath.py File 9.02 KB 0644
zipstream.py File 9.53 KB 0644