404

[ Avaa Bypassed ]




Upload:

Command:

botdev@18.220.147.154: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.application} and its interaction with
L{twisted.persisted.sob}.
"""

from __future__ import absolute_import, division

import copy
import os
import pickle

try:
    import asyncio
except ImportError:
    asyncio = None

from twisted.application import service, internet, app, reactors
from twisted.internet import interfaces, defer, protocol, reactor
from twisted.persisted import sob
from twisted.plugins import twisted_reactors
from twisted.protocols import wire, basic
from twisted.python import usage
from twisted.python.compat import NativeStringIO
from twisted.python.runtime import platformType
from twisted.python.test.modules_helpers import TwistedModulesMixin
from twisted.test.proto_helpers import MemoryReactor
from twisted.trial import unittest



class Dummy:
    processName=None

class ServiceTests(unittest.TestCase):

    def testName(self):
        s = service.Service()
        s.setName("hello")
        self.assertEqual(s.name, "hello")

    def testParent(self):
        s = service.Service()
        p = service.MultiService()
        s.setServiceParent(p)
        self.assertEqual(list(p), [s])
        self.assertEqual(s.parent, p)

    def testApplicationAsParent(self):
        s = service.Service()
        p = service.Application("")
        s.setServiceParent(p)
        self.assertEqual(list(service.IServiceCollection(p)), [s])
        self.assertEqual(s.parent, service.IServiceCollection(p))

    def testNamedChild(self):
        s = service.Service()
        p = service.MultiService()
        s.setName("hello")
        s.setServiceParent(p)
        self.assertEqual(list(p), [s])
        self.assertEqual(s.parent, p)
        self.assertEqual(p.getServiceNamed("hello"), s)

    def testDoublyNamedChild(self):
        s = service.Service()
        p = service.MultiService()
        s.setName("hello")
        s.setServiceParent(p)
        self.assertRaises(RuntimeError, s.setName, "lala")

    def testDuplicateNamedChild(self):
        s = service.Service()
        p = service.MultiService()
        s.setName("hello")
        s.setServiceParent(p)
        s = service.Service()
        s.setName("hello")
        self.assertRaises(RuntimeError, s.setServiceParent, p)

    def testDisowning(self):
        s = service.Service()
        p = service.MultiService()
        s.setServiceParent(p)
        self.assertEqual(list(p), [s])
        self.assertEqual(s.parent, p)
        s.disownServiceParent()
        self.assertEqual(list(p), [])
        self.assertIsNone(s.parent)

    def testRunning(self):
        s = service.Service()
        self.assertFalse(s.running)
        s.startService()
        self.assertTrue(s.running)
        s.stopService()
        self.assertFalse(s.running)

    def testRunningChildren1(self):
        s = service.Service()
        p = service.MultiService()
        s.setServiceParent(p)
        self.assertFalse(s.running)
        self.assertFalse(p.running)
        p.startService()
        self.assertTrue(s.running)
        self.assertTrue(p.running)
        p.stopService()
        self.assertFalse(s.running)
        self.assertFalse(p.running)

    def testRunningChildren2(self):
        s = service.Service()
        def checkRunning():
            self.assertTrue(s.running)
        t = service.Service()
        t.stopService = checkRunning
        t.startService = checkRunning
        p = service.MultiService()
        s.setServiceParent(p)
        t.setServiceParent(p)
        p.startService()
        p.stopService()

    def testAddingIntoRunning(self):
        p = service.MultiService()
        p.startService()
        s = service.Service()
        self.assertFalse(s.running)
        s.setServiceParent(p)
        self.assertTrue(s.running)
        s.disownServiceParent()
        self.assertFalse(s.running)

    def testPrivileged(self):
        s = service.Service()
        def pss():
            s.privilegedStarted = 1
        s.privilegedStartService = pss
        s1 = service.Service()
        p = service.MultiService()
        s.setServiceParent(p)
        s1.setServiceParent(p)
        p.privilegedStartService()
        self.assertTrue(s.privilegedStarted)

    def testCopying(self):
        s = service.Service()
        s.startService()
        s1 = copy.copy(s)
        self.assertFalse(s1.running)
        self.assertTrue(s.running)


if hasattr(os, "getuid"):
    curuid = os.getuid()
    curgid = os.getgid()
else:
    curuid = curgid = 0


class ProcessTests(unittest.TestCase):

    def testID(self):
        p = service.Process(5, 6)
        self.assertEqual(p.uid, 5)
        self.assertEqual(p.gid, 6)

    def testDefaults(self):
        p = service.Process(5)
        self.assertEqual(p.uid, 5)
        self.assertIsNone(p.gid)
        p = service.Process(gid=5)
        self.assertIsNone(p.uid)
        self.assertEqual(p.gid, 5)
        p = service.Process()
        self.assertIsNone(p.uid)
        self.assertIsNone(p.gid)

    def testProcessName(self):
        p = service.Process()
        self.assertIsNone(p.processName)
        p.processName = 'hello'
        self.assertEqual(p.processName, 'hello')


class InterfacesTests(unittest.TestCase):

    def testService(self):
        self.assertTrue(service.IService.providedBy(service.Service()))

    def testMultiService(self):
        self.assertTrue(service.IService.providedBy(service.MultiService()))
        self.assertTrue(service.IServiceCollection.providedBy(
                        service.MultiService()))

    def testProcess(self):
        self.assertTrue(service.IProcess.providedBy(service.Process()))


class ApplicationTests(unittest.TestCase):

    def testConstructor(self):
        service.Application("hello")
        service.Application("hello", 5)
        service.Application("hello", 5, 6)

    def testProcessComponent(self):
        a = service.Application("hello")
        self.assertIsNone(service.IProcess(a).uid)
        self.assertIsNone(service.IProcess(a).gid)
        a = service.Application("hello", 5)
        self.assertEqual(service.IProcess(a).uid, 5)
        self.assertIsNone(service.IProcess(a).gid)
        a = service.Application("hello", 5, 6)
        self.assertEqual(service.IProcess(a).uid, 5)
        self.assertEqual(service.IProcess(a).gid, 6)

    def testServiceComponent(self):
        a = service.Application("hello")
        self.assertIs(service.IService(a), service.IServiceCollection(a))
        self.assertEqual(service.IService(a).name, "hello")
        self.assertIsNone(service.IService(a).parent)

    def testPersistableComponent(self):
        a = service.Application("hello")
        p = sob.IPersistable(a)
        self.assertEqual(p.style, 'pickle')
        self.assertEqual(p.name, 'hello')
        self.assertIs(p.original, a)

class LoadingTests(unittest.TestCase):

    def test_simpleStoreAndLoad(self):
        a = service.Application("hello")
        p = sob.IPersistable(a)
        for style in 'source pickle'.split():
            p.setStyle(style)
            p.save()
            a1 = service.loadApplication("hello.ta"+style[0], style)
            self.assertEqual(service.IService(a1).name, "hello")
        with open("hello.tac", 'w') as f:
            f.writelines([
                "from twisted.application import service\n",
                "application = service.Application('hello')\n",
            ])
        a1 = service.loadApplication("hello.tac", 'python')
        self.assertEqual(service.IService(a1).name, "hello")



class AppSupportTests(unittest.TestCase):

    def testPassphrase(self):
        self.assertIsNone(app.getPassphrase(0))

    def testLoadApplication(self):
        """
        Test loading an application file in different dump format.
        """
        a = service.Application("hello")
        baseconfig = {'file': None, 'source': None, 'python':None}
        for style in 'source pickle'.split():
            config = baseconfig.copy()
            config[{'pickle': 'file'}.get(style, style)] = 'helloapplication'
            sob.IPersistable(a).setStyle(style)
            sob.IPersistable(a).save(filename='helloapplication')
            a1 = app.getApplication(config, None)
            self.assertEqual(service.IService(a1).name, "hello")
        config = baseconfig.copy()
        config['python'] = 'helloapplication'
        with open("helloapplication", 'w') as f:
            f.writelines([
                "from twisted.application import service\n",
                "application = service.Application('hello')\n",
            ])
        a1 = app.getApplication(config, None)
        self.assertEqual(service.IService(a1).name, "hello")

    def test_convertStyle(self):
        appl = service.Application("lala")
        for instyle in 'source pickle'.split():
            for outstyle in 'source pickle'.split():
                sob.IPersistable(appl).setStyle(instyle)
                sob.IPersistable(appl).save(filename="converttest")
                app.convertStyle("converttest", instyle, None,
                                 "converttest.out", outstyle, 0)
                appl2 = service.loadApplication("converttest.out", outstyle)
                self.assertEqual(service.IService(appl2).name, "lala")


    def test_startApplication(self):
        appl = service.Application("lala")
        app.startApplication(appl, 0)
        self.assertTrue(service.IService(appl).running)


class Foo(basic.LineReceiver):
    def connectionMade(self):
        self.transport.write(b'lalala\r\n')
    def lineReceived(self, line):
        self.factory.line = line
        self.transport.loseConnection()
    def connectionLost(self, reason):
        self.factory.d.callback(self.factory.line)


class DummyApp:
    processName = None
    def addService(self, service):
        self.services[service.name] = service
    def removeService(self, service):
        del self.services[service.name]


class TimerTarget:
    def __init__(self):
        self.l = []
    def append(self, what):
        self.l.append(what)



class TestEcho(wire.Echo):
    def connectionLost(self, reason):
        self.d.callback(True)



class InternetTests(unittest.TestCase):

    def testTCP(self):
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.setServiceParent(s)
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, b'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d


    def test_UDP(self):
        """
        Test L{internet.UDPServer} with a random port: starting the service
        should give it valid port, and stopService should free it so that we
        can start a server on the same port again.
        """
        if not interfaces.IReactorUDP(reactor, None):
            raise unittest.SkipTest("This reactor does not support UDP sockets")
        p = protocol.DatagramProtocol()
        t = internet.UDPServer(0, p)
        t.startService()
        num = t._port.getHost().port
        self.assertNotEqual(num, 0)
        def onStop(ignored):
            t = internet.UDPServer(num, p)
            t.startService()
            return t.stopService()
        return defer.maybeDeferred(t.stopService).addCallback(onStop)


    def testPrivileged(self):
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.privileged = 1
        t.privilegedStartService()
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        c = internet.TCPClient('127.0.0.1', num, factory)
        c.startService()
        factory.d.addCallback(self.assertEqual, b'lalala')
        factory.d.addCallback(lambda x : c.stopService())
        factory.d.addCallback(lambda x : t.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d

    def testConnectionGettingRefused(self):
        factory = protocol.ServerFactory()
        factory.protocol = wire.Echo
        t = internet.TCPServer(0, factory)
        t.startService()
        num = t._port.getHost().port
        t.stopService()
        d = defer.Deferred()
        factory = protocol.ClientFactory()
        factory.clientConnectionFailed = lambda *args: d.callback(None)
        c = internet.TCPClient('127.0.0.1', num, factory)
        c.startService()
        return d

    def testUNIX(self):
        # FIXME: This test is far too dense.  It needs comments.
        #  -- spiv, 2004-11-07
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.UNIXServer('echo.skt', factory)
        t.setServiceParent(s)
        factory = protocol.ClientFactory()
        factory.protocol = Foo
        factory.d = defer.Deferred()
        factory.line = None
        internet.UNIXClient('echo.skt', factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, b'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        factory.d.addCallback(self._cbTestUnix, factory, s)
        return factory.d


    def _cbTestUnix(self, ignored, factory, s):
        TestEcho.d = defer.Deferred()
        factory.line = None
        factory.d = defer.Deferred()
        s.startService()
        factory.d.addCallback(self.assertEqual, b'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d

    def testVolatile(self):
        factory = protocol.ServerFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXServer('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._port, None)
        t1 = copy.copy(t)
        self.assertIsNone(t1._port)
        t.stopService()
        self.assertIsNone(t._port)
        self.assertFalse(t.running)

        factory = protocol.ClientFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXClient('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._connection, None)
        t1 = copy.copy(t)
        self.assertIsNone(t1._connection)
        t.stopService()
        self.assertIsNone(t._connection)
        self.assertFalse(t.running)

    def testStoppingServer(self):
        factory = protocol.ServerFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXServer('echo.skt', factory)
        t.startService()
        t.stopService()
        self.assertFalse(t.running)
        factory = protocol.ClientFactory()
        d = defer.Deferred()
        factory.clientConnectionFailed = lambda *args: d.callback(None)
        reactor.connectUNIX('echo.skt', factory)
        return d

    if not interfaces.IReactorUNIX(reactor, None):
        _skipMsg = "This reactor does not support UNIX domain sockets"
        testUNIX.skip = _skipMsg
        testVolatile.skip = _skipMsg
        testStoppingServer.skip = _skipMsg


    def testPickledTimer(self):
        target = TimerTarget()
        t0 = internet.TimerService(1, target.append, "hello")
        t0.startService()
        s = pickle.dumps(t0)
        t0.stopService()

        t = pickle.loads(s)
        self.assertFalse(t.running)

    def testBrokenTimer(self):
        d = defer.Deferred()
        t = internet.TimerService(1, lambda: 1 // 0)
        oldFailed = t._failed
        def _failed(why):
            oldFailed(why)
            d.callback(None)
        t._failed = _failed
        t.startService()
        d.addCallback(lambda x : t.stopService)
        d.addCallback(lambda x : self.assertEqual(
            [ZeroDivisionError],
            [o.value.__class__ for o in self.flushLoggedErrors(ZeroDivisionError)]))
        return d


    def test_everythingThere(self):
        """
        L{twisted.application.internet} dynamically defines a set of
        L{service.Service} subclasses that in general have corresponding
        reactor.listenXXX or reactor.connectXXX calls.
        """
        trans = 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split()
        for tran in trans[:]:
            if not getattr(interfaces, "IReactor" + tran)(reactor, None):
                trans.remove(tran)
        for tran in trans:
            for side in 'Server Client'.split():
                if tran == "Multicast" and side == "Client":
                    continue
                if tran == "UDP" and side == "Client":
                    continue
                self.assertTrue(hasattr(internet, tran + side))
                method = getattr(internet, tran + side).method
                prefix = {'Server': 'listen', 'Client': 'connect'}[side]
                self.assertTrue(hasattr(reactor, prefix + method) or
                        (prefix == "connect" and method == "UDP"))
                o = getattr(internet, tran + side)()
                self.assertEqual(service.IService(o), o)


    def test_importAll(self):
        """
        L{twisted.application.internet} dynamically defines L{service.Service}
        subclasses. This test ensures that the subclasses exposed by C{__all__}
        are valid attributes of the module.
        """
        for cls in internet.__all__:
            self.assertTrue(
                hasattr(internet, cls),
                '%s not importable from twisted.application.internet' % (cls,))


    def test_reactorParametrizationInServer(self):
        """
        L{internet._AbstractServer} supports a C{reactor} keyword argument
        that can be used to parametrize the reactor used to listen for
        connections.
        """
        reactor = MemoryReactor()

        factory = object()
        t = internet.TCPServer(1234, factory, reactor=reactor)
        t.startService()
        self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))


    def test_reactorParametrizationInClient(self):
        """
        L{internet._AbstractClient} supports a C{reactor} keyword arguments
        that can be used to parametrize the reactor used to create new client
        connections.
        """
        reactor = MemoryReactor()

        factory = protocol.ClientFactory()
        t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
        t.startService()
        self.assertEqual(
            reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))


    def test_reactorParametrizationInServerMultipleStart(self):
        """
        Like L{test_reactorParametrizationInServer}, but stop and restart the
        service and check that the given reactor is still used.
        """
        reactor = MemoryReactor()

        factory = protocol.Factory()
        t = internet.TCPServer(1234, factory, reactor=reactor)
        t.startService()
        self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))
        t.stopService()
        t.startService()
        self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))


    def test_reactorParametrizationInClientMultipleStart(self):
        """
        Like L{test_reactorParametrizationInClient}, but stop and restart the
        service and check that the given reactor is still used.
        """
        reactor = MemoryReactor()

        factory = protocol.ClientFactory()
        t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
        t.startService()
        self.assertEqual(
            reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
        t.stopService()
        t.startService()
        self.assertEqual(
            reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))



class TimerBasicTests(unittest.TestCase):

    def testTimerRuns(self):
        d = defer.Deferred()
        self.t = internet.TimerService(1, d.callback, 'hello')
        self.t.startService()
        d.addCallback(self.assertEqual, 'hello')
        d.addCallback(lambda x : self.t.stopService())
        d.addCallback(lambda x : self.assertFalse(self.t.running))
        return d

    def tearDown(self):
        return self.t.stopService()

    def testTimerRestart(self):
        # restart the same TimerService
        d1 = defer.Deferred()
        d2 = defer.Deferred()
        work = [(d2, "bar"), (d1, "foo")]
        def trigger():
            d, arg = work.pop()
            d.callback(arg)
        self.t = internet.TimerService(1, trigger)
        self.t.startService()
        def onFirstResult(result):
            self.assertEqual(result, 'foo')
            return self.t.stopService()
        def onFirstStop(ignored):
            self.assertFalse(self.t.running)
            self.t.startService()
            return d2
        def onSecondResult(result):
            self.assertEqual(result, 'bar')
            self.t.stopService()
        d1.addCallback(onFirstResult)
        d1.addCallback(onFirstStop)
        d1.addCallback(onSecondResult)
        return d1

    def testTimerLoops(self):
        l = []
        def trigger(data, number, d):
            l.append(data)
            if len(l) == number:
                d.callback(l)
        d = defer.Deferred()
        self.t = internet.TimerService(0.01, trigger, "hello", 10, d)
        self.t.startService()
        d.addCallback(self.assertEqual, ['hello'] * 10)
        d.addCallback(lambda x : self.t.stopService())
        return d


class FakeReactor(reactors.Reactor):
    """
    A fake reactor with a hooked install method.
    """

    def __init__(self, install, *args, **kwargs):
        """
        @param install: any callable that will be used as install method.
        @type install: C{callable}
        """
        reactors.Reactor.__init__(self, *args, **kwargs)
        self.install = install



class PluggableReactorTests(TwistedModulesMixin, unittest.TestCase):
    """
    Tests for the reactor discovery/inspection APIs.
    """

    def setUp(self):
        """
        Override the L{reactors.getPlugins} function, normally bound to
        L{twisted.plugin.getPlugins}, in order to control which
        L{IReactorInstaller} plugins are seen as available.

        C{self.pluginResults} can be customized and will be used as the
        result of calls to C{reactors.getPlugins}.
        """
        self.pluginCalls = []
        self.pluginResults = []
        self.originalFunction = reactors.getPlugins
        reactors.getPlugins = self._getPlugins


    def tearDown(self):
        """
        Restore the original L{reactors.getPlugins}.
        """
        reactors.getPlugins = self.originalFunction


    def _getPlugins(self, interface, package=None):
        """
        Stand-in for the real getPlugins method which records its arguments
        and returns a fixed result.
        """
        self.pluginCalls.append((interface, package))
        return list(self.pluginResults)


    def test_getPluginReactorTypes(self):
        """
        Test that reactor plugins are returned from L{getReactorTypes}
        """
        name = 'fakereactortest'
        package = __name__ + '.fakereactor'
        description = 'description'
        self.pluginResults = [reactors.Reactor(name, package, description)]
        reactorTypes = reactors.getReactorTypes()

        self.assertEqual(
            self.pluginCalls,
            [(reactors.IReactorInstaller, None)])

        for r in reactorTypes:
            if r.shortName == name:
                self.assertEqual(r.description, description)
                break
        else:
            self.fail("Reactor plugin not present in getReactorTypes() result")


    def test_reactorInstallation(self):
        """
        Test that L{reactors.Reactor.install} loads the correct module and
        calls its install attribute.
        """
        installed = []
        def install():
            installed.append(True)
        fakeReactor = FakeReactor(install,
                                  'fakereactortest', __name__, 'described')
        modules = {'fakereactortest': fakeReactor}
        self.replaceSysModules(modules)
        installer = reactors.Reactor('fakereactor', 'fakereactortest', 'described')
        installer.install()
        self.assertEqual(installed, [True])


    def test_installReactor(self):
        """
        Test that the L{reactors.installReactor} function correctly installs
        the specified reactor.
        """
        installed = []
        def install():
            installed.append(True)
        name = 'fakereactortest'
        package = __name__
        description = 'description'
        self.pluginResults = [FakeReactor(install, name, package, description)]
        reactors.installReactor(name)
        self.assertEqual(installed, [True])


    def test_installReactorReturnsReactor(self):
        """
        Test that the L{reactors.installReactor} function correctly returns
        the installed reactor.
        """
        reactor = object()
        def install():
            from twisted import internet
            self.patch(internet, 'reactor', reactor)
        name = 'fakereactortest'
        package = __name__
        description = 'description'
        self.pluginResults = [FakeReactor(install, name, package, description)]
        installed = reactors.installReactor(name)
        self.assertIs(installed, reactor)


    def test_installReactorMultiplePlugins(self):
        """
        Test that the L{reactors.installReactor} function correctly installs
        the specified reactor when there are multiple reactor plugins.
        """
        installed = []
        def install():
            installed.append(True)
        name = 'fakereactortest'
        package = __name__
        description = 'description'
        fakeReactor = FakeReactor(install, name, package, description)
        otherReactor = FakeReactor(lambda: None,
                                   "otherreactor", package, description)
        self.pluginResults = [otherReactor, fakeReactor]
        reactors.installReactor(name)
        self.assertEqual(installed, [True])


    def test_installNonExistentReactor(self):
        """
        Test that L{reactors.installReactor} raises L{reactors.NoSuchReactor}
        when asked to install a reactor which it cannot find.
        """
        self.pluginResults = []
        self.assertRaises(
            reactors.NoSuchReactor,
            reactors.installReactor, 'somereactor')


    def test_installNotAvailableReactor(self):
        """
        Test that L{reactors.installReactor} raises an exception when asked to
        install a reactor which doesn't work in this environment.
        """
        def install():
            raise ImportError("Missing foo bar")
        name = 'fakereactortest'
        package = __name__
        description = 'description'
        self.pluginResults = [FakeReactor(install, name, package, description)]
        self.assertRaises(ImportError, reactors.installReactor, name)


    def test_reactorSelectionMixin(self):
        """
        Test that the reactor selected is installed as soon as possible, ie
        when the option is parsed.
        """
        executed = []
        INSTALL_EVENT = 'reactor installed'
        SUBCOMMAND_EVENT = 'subcommands loaded'

        class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
            def subCommands(self):
                executed.append(SUBCOMMAND_EVENT)
                return [('subcommand', None, lambda: self, 'test subcommand')]
            subCommands = property(subCommands)

        def install():
            executed.append(INSTALL_EVENT)
        self.pluginResults = [
            FakeReactor(install, 'fakereactortest', __name__, 'described')
        ]

        options = ReactorSelectionOptions()
        options.parseOptions(['--reactor', 'fakereactortest', 'subcommand'])
        self.assertEqual(executed[0], INSTALL_EVENT)
        self.assertEqual(executed.count(INSTALL_EVENT), 1)
        self.assertEqual(options["reactor"], "fakereactortest")


    def test_reactorSelectionMixinNonExistent(self):
        """
        Test that the usage mixin exits when trying to use a non existent
        reactor (the name not matching to any reactor), giving an error
        message.
        """
        class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
            pass
        self.pluginResults = []

        options = ReactorSelectionOptions()
        options.messageOutput = NativeStringIO()
        e = self.assertRaises(usage.UsageError, options.parseOptions,
                              ['--reactor', 'fakereactortest', 'subcommand'])
        self.assertIn("fakereactortest", e.args[0])
        self.assertIn("help-reactors", e.args[0])


    def test_reactorSelectionMixinNotAvailable(self):
        """
        Test that the usage mixin exits when trying to use a reactor not
        available (the reactor raises an error at installation), giving an
        error message.
        """
        class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
            pass
        message = "Missing foo bar"
        def install():
            raise ImportError(message)

        name = 'fakereactortest'
        package = __name__
        description = 'description'
        self.pluginResults = [FakeReactor(install, name, package, description)]

        options = ReactorSelectionOptions()
        options.messageOutput = NativeStringIO()
        e =  self.assertRaises(usage.UsageError, options.parseOptions,
                               ['--reactor', 'fakereactortest', 'subcommand'])
        self.assertIn(message, e.args[0])
        self.assertIn("help-reactors", e.args[0])



class HelpReactorsTests(unittest.TestCase):
    """
    --help-reactors lists the available reactors
    """
    def setUp(self):
        """
        Get the text from --help-reactors
        """
        self.options = app.ReactorSelectionMixin()
        self.options.messageOutput = NativeStringIO()
        self.assertRaises(SystemExit, self.options.opt_help_reactors)
        self.message = self.options.messageOutput.getvalue()


    def test_lacksAsyncIO(self):
        """
        --help-reactors should NOT display the asyncio reactor on Python < 3.4
        """
        self.assertIn(twisted_reactors.asyncio.description, self.message)
        self.assertIn("!" + twisted_reactors.asyncio.shortName, self.message)
    if asyncio:
        test_lacksAsyncIO.skip = "Not applicable, asyncio is available"


    def test_hasAsyncIO(self):
        """
        --help-reactors should display the asyncio reactor on Python >= 3.4
        """
        self.assertIn(twisted_reactors.asyncio.description, self.message)
        self.assertNotIn(
            "!" + twisted_reactors.asyncio.shortName, self.message)
    if not asyncio:
        test_hasAsyncIO.skip = "asyncio library not available"


    def test_iocpWin32(self):
        """
        --help-reactors should display the iocp reactor on Windows
        """
        self.assertIn(twisted_reactors.iocp.description, self.message)
        self.assertNotIn("!" + twisted_reactors.iocp.shortName, self.message)
    if platformType != "win32":
        test_iocpWin32.skip = "Test only applicable on Windows"


    def test_iocpNotWin32(self):
        """
        --help-reactors should NOT display the iocp reactor on Windows
        """
        self.assertIn(twisted_reactors.iocp.description, self.message)
        self.assertIn("!" + twisted_reactors.iocp.shortName, self.message)
    if platformType == "win32":
        test_iocpNotWin32.skip = "Test only applicable on Windows"


    def test_onlySupportedReactors(self):
        """
        --help-reactors with only supported reactors
        """
        def getReactorTypes():
            yield twisted_reactors.default

        options = app.ReactorSelectionMixin()
        options._getReactorTypes = getReactorTypes
        options.messageOutput = NativeStringIO()
        self.assertRaises(SystemExit, options.opt_help_reactors)
        message = options.messageOutput.getvalue()
        self.assertNotIn("reactors not available", message)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 103 B 0644
cert.pem.no_trailing_newline File 1.38 KB 0644
crash_test_dummy.py File 543 B 0644
iosim.py File 17.3 KB 0644
key.pem.no_trailing_newline File 1.67 KB 0644
mock_win32process.py File 1.46 KB 0644
myrebuilder1.py File 158 B 0644
myrebuilder2.py File 158 B 0644
plugin_basic.py File 943 B 0644
plugin_extra1.py File 407 B 0644
plugin_extra2.py File 579 B 0644
process_cmdline.py File 162 B 0644
process_echoer.py File 214 B 0644
process_fds.py File 945 B 0644
process_getargv.py File 283 B 0644
process_getenv.py File 268 B 0644
process_linger.py File 286 B 0644
process_reader.py File 188 B 0644
process_signal.py File 214 B 0644
process_stdinreader.py File 857 B 0644
process_tester.py File 1.01 KB 0644
process_tty.py File 130 B 0644
process_twisted.py File 1.18 KB 0644
proto_helpers.py File 26.33 KB 0644
raiser.c File 93.05 KB 0644
raiser.cpython-36m-x86_64-linux-gnu.so File 19.16 KB 0644
raiser.pyx File 466 B 0644
reflect_helper_IE.py File 61 B 0644
reflect_helper_VE.py File 82 B 0644
reflect_helper_ZDE.py File 47 B 0644
server.pem File 4.34 KB 0644
ssl_helpers.py File 1.01 KB 0644
stdio_test_consumer.py File 1.19 KB 0644
stdio_test_halfclose.py File 1.89 KB 0644
stdio_test_hostpeer.py File 1021 B 0644
stdio_test_lastwrite.py File 1.18 KB 0644
stdio_test_loseconn.py File 1.51 KB 0644
stdio_test_producer.py File 1.47 KB 0644
stdio_test_write.py File 923 B 0644
stdio_test_writeseq.py File 915 B 0644
test_abstract.py File 3.42 KB 0644
test_adbapi.py File 25.53 KB 0644
test_amp.py File 107.96 KB 0644
test_application.py File 32.05 KB 0644
test_compat.py File 27.32 KB 0644
test_context.py File 1.48 KB 0644
test_cooperator.py File 20.96 KB 0644
test_defer.py File 100.93 KB 0644
test_defgen.py File 10.45 KB 0644
test_dict.py File 1.41 KB 0644
test_dirdbm.py File 6.76 KB 0644
test_error.py File 8.39 KB 0644
test_factories.py File 4.53 KB 0644
test_failure.py File 29.92 KB 0644
test_fdesc.py File 7.2 KB 0644
test_finger.py File 1.95 KB 0644
test_formmethod.py File 3.56 KB 0644
test_ftp.py File 127.27 KB 0644
test_ftp_options.py File 2.62 KB 0644
test_htb.py File 3.12 KB 0644
test_ident.py File 6.85 KB 0644
test_internet.py File 45.33 KB 0644
test_iosim.py File 8.49 KB 0644
test_iutils.py File 13.13 KB 0644
test_lockfile.py File 15.14 KB 0644
test_log.py File 35.48 KB 0644
test_logfile.py File 17.8 KB 0644
test_loopback.py File 14.15 KB 0644
test_main.py File 2.44 KB 0644
test_memcache.py File 24.55 KB 0644
test_modules.py File 17.47 KB 0644
test_monkey.py File 5.5 KB 0644
test_nooldstyle.py File 5.82 KB 0644
test_paths.py File 72.61 KB 0644
test_pcp.py File 12.26 KB 0644
test_persisted.py File 14.28 KB 0644
test_plugin.py File 25.5 KB 0644
test_policies.py File 32.04 KB 0644
test_postfix.py File 3.53 KB 0644
test_process.py File 84.1 KB 0644
test_protocols.py File 7.28 KB 0644
test_randbytes.py File 3.28 KB 0644
test_rebuild.py File 8.3 KB 0644
test_reflect.py File 25.47 KB 0644
test_roots.py File 1.77 KB 0644
test_shortcut.py File 1.89 KB 0644
test_sip.py File 24.69 KB 0644
test_sob.py File 5.5 KB 0644
test_socks.py File 17.32 KB 0644
test_ssl.py File 23.29 KB 0644
test_sslverify.py File 104.28 KB 0644
test_stateful.py File 1.97 KB 0644
test_stdio.py File 12.85 KB 0644
test_strerror.py File 5.06 KB 0644
test_stringtransport.py File 12.95 KB 0644
test_strports.py File 1.75 KB 0644
test_task.py File 38.4 KB 0644
test_tcp.py File 64.07 KB 0644
test_tcp_internals.py File 8.54 KB 0644
test_text.py File 6.3 KB 0644
test_threadable.py File 3.65 KB 0644
test_threadpool.py File 22.47 KB 0644
test_threads.py File 12.96 KB 0644
test_tpfile.py File 1.56 KB 0644
test_twistd.py File 61.05 KB 0644
test_twisted.py File 18.42 KB 0644
test_udp.py File 24.1 KB 0644
test_unix.py File 14.8 KB 0644
test_usage.py File 23.09 KB 0644
testutils.py File 5.19 KB 0644