404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.14.135.249: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{IReactorUNIX} and L{IReactorUNIXDatagram}.
"""

from __future__ import division, absolute_import

import os
import sys
import types
import socket

from twisted.internet import interfaces, reactor, protocol, error, address
from twisted.internet import defer, utils
from twisted.python import lockfile
from twisted.python.compat import _PY3, networkString
from twisted.python.filepath import FilePath
from twisted.trial import unittest

from twisted.test.test_tcp import MyServerFactory, MyClientFactory


class FailedConnectionClientFactory(protocol.ClientFactory):
    def __init__(self, onFail):
        self.onFail = onFail

    def clientConnectionFailed(self, connector, reason):
        self.onFail.errback(reason)



class UnixSocketTests(unittest.TestCase):
    """
    Test unix sockets.
    """
    def test_peerBind(self):
        """
        The address passed to the server factory's C{buildProtocol} method and
        the address returned by the connected protocol's transport's C{getPeer}
        method match the address the client socket is bound to.
        """
        filename = self.mktemp()
        peername = self.mktemp()
        serverFactory = MyServerFactory()
        connMade = serverFactory.protocolConnectionMade = defer.Deferred()
        unixPort = reactor.listenUNIX(filename, serverFactory)
        self.addCleanup(unixPort.stopListening)
        unixSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.addCleanup(unixSocket.close)
        unixSocket.bind(peername)
        unixSocket.connect(filename)
        def cbConnMade(proto):
            expected = address.UNIXAddress(peername)
            self.assertEqual(serverFactory.peerAddresses, [expected])
            self.assertEqual(proto.transport.getPeer(), expected)
        connMade.addCallback(cbConnMade)
        return connMade


    def test_dumber(self):
        """
        L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
        started with L{IReactorUNIX.listenUNIX}.
        """
        filename = self.mktemp()
        serverFactory = MyServerFactory()
        serverConnMade = defer.Deferred()
        serverFactory.protocolConnectionMade = serverConnMade
        unixPort = reactor.listenUNIX(filename, serverFactory)
        self.addCleanup(unixPort.stopListening)
        clientFactory = MyClientFactory()
        clientConnMade = defer.Deferred()
        clientFactory.protocolConnectionMade = clientConnMade
        reactor.connectUNIX(filename, clientFactory)
        d = defer.gatherResults([serverConnMade, clientConnMade])
        def allConnected(args):
            serverProtocol, clientProtocol = args
            # Incidental assertion which may or may not be redundant with some
            # other test.  This probably deserves its own test method.
            self.assertEqual(clientFactory.peerAddresses,
                             [address.UNIXAddress(filename)])

            clientProtocol.transport.loseConnection()
            serverProtocol.transport.loseConnection()
        d.addCallback(allConnected)
        return d


    def test_pidFile(self):
        """
        A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is
        called and released when the Deferred returned by the L{IListeningPort}
        provider's C{stopListening} method is called back.
        """
        filename = self.mktemp()
        serverFactory = MyServerFactory()
        serverConnMade = defer.Deferred()
        serverFactory.protocolConnectionMade = serverConnMade
        unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
        self.assertTrue(lockfile.isLocked(filename + ".lock"))

        # XXX This part would test something about the checkPID parameter, but
        # it doesn't actually.  It should be rewritten to test the several
        # different possible behaviors.  -exarkun
        clientFactory = MyClientFactory()
        clientConnMade = defer.Deferred()
        clientFactory.protocolConnectionMade = clientConnMade
        reactor.connectUNIX(filename, clientFactory, checkPID=1)

        d = defer.gatherResults([serverConnMade, clientConnMade])
        def _portStuff(args):
            serverProtocol, clientProto = args

            # Incidental assertion which may or may not be redundant with some
            # other test.  This probably deserves its own test method.
            self.assertEqual(clientFactory.peerAddresses,
                             [address.UNIXAddress(filename)])

            clientProto.transport.loseConnection()
            serverProtocol.transport.loseConnection()
            return unixPort.stopListening()
        d.addCallback(_portStuff)

        def _check(ignored):
            self.assertFalse(lockfile.isLocked(filename + ".lock"), 'locked')
        d.addCallback(_check)
        return d


    def test_socketLocking(self):
        """
        L{IReactorUNIX.listenUNIX} raises L{error.CannotListenError} if passed
        the name of a file on which a server is already listening.
        """
        filename = self.mktemp()
        serverFactory = MyServerFactory()
        unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)

        self.assertRaises(
            error.CannotListenError,
            reactor.listenUNIX, filename, serverFactory, wantPID=True)

        def stoppedListening(ign):
            unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
            return unixPort.stopListening()

        return unixPort.stopListening().addCallback(stoppedListening)


    def _uncleanSocketTest(self, callback):
        self.filename = self.mktemp()
        source = networkString((
            "from twisted.internet import protocol, reactor\n"
            "reactor.listenUNIX(%r, protocol.ServerFactory(),"
            "wantPID=True)\n") % (self.filename,))
        env = {b'PYTHONPATH': FilePath(
            os.pathsep.join(sys.path)).asBytesMode().path}
        pyExe = FilePath(sys.executable).asBytesMode().path

        d = utils.getProcessValue(pyExe, (b"-u", b"-c", source), env=env)
        d.addCallback(callback)
        return d


    def test_uncleanServerSocketLocking(self):
        """
        If passed C{True} for the C{wantPID} parameter, a server can be started
        listening with L{IReactorUNIX.listenUNIX} when passed the name of a
        file on which a previous server which has not exited cleanly has been
        listening using the C{wantPID} option.
        """
        def ranStupidChild(ign):
            # If this next call succeeds, our lock handling is correct.
            p = reactor.listenUNIX(self.filename, MyServerFactory(), wantPID=True)
            return p.stopListening()
        return self._uncleanSocketTest(ranStupidChild)


    def test_connectToUncleanServer(self):
        """
        If passed C{True} for the C{checkPID} parameter, a client connection
        attempt made with L{IReactorUNIX.connectUNIX} fails with
        L{error.BadFileError}.
        """
        def ranStupidChild(ign):
            d = defer.Deferred()
            f = FailedConnectionClientFactory(d)
            reactor.connectUNIX(self.filename, f, checkPID=True)
            return self.assertFailure(d, error.BadFileError)
        return self._uncleanSocketTest(ranStupidChild)


    def _reprTest(self, serverFactory, factoryName):
        """
        Test the C{__str__} and C{__repr__} implementations of a UNIX port when
        used with the given factory.
        """
        filename = self.mktemp()
        unixPort = reactor.listenUNIX(filename, serverFactory)

        connectedString = "<%s on %r>" % (factoryName, filename)
        self.assertEqual(repr(unixPort), connectedString)
        self.assertEqual(str(unixPort), connectedString)

        d = defer.maybeDeferred(unixPort.stopListening)
        def stoppedListening(ign):
            unconnectedString = "<%s (not listening)>" % (factoryName,)
            self.assertEqual(repr(unixPort), unconnectedString)
            self.assertEqual(str(unixPort), unconnectedString)
        d.addCallback(stoppedListening)
        return d


    def test_reprWithClassicFactory(self):
        """
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIX.listenUNIX} contains the name of the classic factory
        class being used and the filename on which the port is listening or
        indicates that the port is not listening.
        """
        class ClassicFactory:
            def doStart(self):
                pass

            def doStop(self):
                pass

        # Sanity check
        self.assertIsInstance(ClassicFactory, types.ClassType)

        return self._reprTest(
            ClassicFactory(), "twisted.test.test_unix.ClassicFactory")

    if _PY3:
        test_reprWithClassicFactory.skip = (
            "Classic classes do not exist on Python 3.")


    def test_reprWithNewStyleFactory(self):
        """
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIX.listenUNIX} contains the name of the new-style factory
        class being used and the filename on which the port is listening or
        indicates that the port is not listening.
        """
        class NewStyleFactory(object):
            def doStart(self):
                pass

            def doStop(self):
                pass

        # Sanity check
        self.assertIsInstance(NewStyleFactory, type)

        return self._reprTest(
            NewStyleFactory(), "twisted.test.test_unix.NewStyleFactory")



class ClientProto(protocol.ConnectedDatagramProtocol):
    started = stopped = False
    gotback = None

    def __init__(self):
        self.deferredStarted = defer.Deferred()
        self.deferredGotBack = defer.Deferred()

    def stopProtocol(self):
        self.stopped = True

    def startProtocol(self):
        self.started = True
        self.deferredStarted.callback(None)

    def datagramReceived(self, data):
        self.gotback = data
        self.deferredGotBack.callback(None)



class ServerProto(protocol.DatagramProtocol):
    started = stopped = False
    gotwhat = gotfrom = None

    def __init__(self):
        self.deferredStarted = defer.Deferred()
        self.deferredGotWhat = defer.Deferred()

    def stopProtocol(self):
        self.stopped = True

    def startProtocol(self):
        self.started = True
        self.deferredStarted.callback(None)

    def datagramReceived(self, data, addr):
        self.gotfrom = addr
        self.transport.write(b"hi back", addr)
        self.gotwhat = data
        self.deferredGotWhat.callback(None)



class DatagramUnixSocketTests(unittest.TestCase):
    """
    Test datagram UNIX sockets.
    """
    def test_exchange(self):
        """
        Test that a datagram can be sent to and received by a server and vice
        versa.
        """
        clientaddr = self.mktemp()
        serveraddr = self.mktemp()
        sp = ServerProto()
        cp = ClientProto()
        s = reactor.listenUNIXDatagram(serveraddr, sp)
        self.addCleanup(s.stopListening)
        c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
        self.addCleanup(c.stopListening)

        d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
        def write(ignored):
            cp.transport.write(b"hi")
            return defer.gatherResults([sp.deferredGotWhat,
                                        cp.deferredGotBack])

        def _cbTestExchange(ignored):
            self.assertEqual(b"hi", sp.gotwhat)
            self.assertEqual(clientaddr, sp.gotfrom)
            self.assertEqual(b"hi back", cp.gotback)

        d.addCallback(write)
        d.addCallback(_cbTestExchange)
        return d


    def test_cannotListen(self):
        """
        L{IReactorUNIXDatagram.listenUNIXDatagram} raises
        L{error.CannotListenError} if the unix socket specified is already in
        use.
        """
        addr = self.mktemp()
        p = ServerProto()
        s = reactor.listenUNIXDatagram(addr, p)
        self.assertRaises(error.CannotListenError,
                          reactor.listenUNIXDatagram, addr, p)
        s.stopListening()
        os.unlink(addr)

    # test connecting to bound and connected (somewhere else) address

    def _reprTest(self, serverProto, protocolName):
        """
        Test the C{__str__} and C{__repr__} implementations of a UNIX datagram
        port when used with the given protocol.
        """
        filename = self.mktemp()
        unixPort = reactor.listenUNIXDatagram(filename, serverProto)

        connectedString = "<%s on %r>" % (protocolName, filename)
        self.assertEqual(repr(unixPort), connectedString)
        self.assertEqual(str(unixPort), connectedString)

        stopDeferred = defer.maybeDeferred(unixPort.stopListening)
        def stoppedListening(ign):
            unconnectedString = "<%s (not listening)>" % (protocolName,)
            self.assertEqual(repr(unixPort), unconnectedString)
            self.assertEqual(str(unixPort), unconnectedString)
        stopDeferred.addCallback(stoppedListening)
        return stopDeferred


    def test_reprWithClassicProtocol(self):
        """
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
        classic protocol class being used and the filename on which the port is
        listening or indicates that the port is not listening.
        """
        class ClassicProtocol:
            def makeConnection(self, transport):
                pass

            def doStop(self):
                pass

        # Sanity check
        self.assertIsInstance(ClassicProtocol, types.ClassType)

        return self._reprTest(
            ClassicProtocol(), "twisted.test.test_unix.ClassicProtocol")

    if _PY3:
        test_reprWithClassicProtocol.skip = (
            "Classic classes do not exist on Python 3.")


    def test_reprWithNewStyleProtocol(self):
        """
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
        new-style protocol class being used and the filename on which the port
        is listening or indicates that the port is not listening.
        """
        class NewStyleProtocol(object):
            def makeConnection(self, transport):
                pass

            def doStop(self):
                pass

        # Sanity check
        self.assertIsInstance(NewStyleProtocol, type)

        return self._reprTest(
            NewStyleProtocol(), "twisted.test.test_unix.NewStyleProtocol")



if not interfaces.IReactorUNIX(reactor, None):
    UnixSocketTests.skip = "This reactor does not support UNIX domain sockets"
if not interfaces.IReactorUNIXDatagram(reactor, None):
    DatagramUnixSocketTests.skip = "This reactor does not support UNIX datagram sockets"

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 103 B 0644
cert.pem.no_trailing_newline File 1.38 KB 0644
crash_test_dummy.py File 543 B 0644
iosim.py File 17.3 KB 0644
key.pem.no_trailing_newline File 1.67 KB 0644
mock_win32process.py File 1.46 KB 0644
myrebuilder1.py File 158 B 0644
myrebuilder2.py File 158 B 0644
plugin_basic.py File 943 B 0644
plugin_extra1.py File 407 B 0644
plugin_extra2.py File 579 B 0644
process_cmdline.py File 162 B 0644
process_echoer.py File 214 B 0644
process_fds.py File 945 B 0644
process_getargv.py File 283 B 0644
process_getenv.py File 268 B 0644
process_linger.py File 286 B 0644
process_reader.py File 188 B 0644
process_signal.py File 214 B 0644
process_stdinreader.py File 857 B 0644
process_tester.py File 1.01 KB 0644
process_tty.py File 130 B 0644
process_twisted.py File 1.18 KB 0644
proto_helpers.py File 26.33 KB 0644
raiser.c File 93.05 KB 0644
raiser.cpython-36m-x86_64-linux-gnu.so File 19.16 KB 0644
raiser.pyx File 466 B 0644
reflect_helper_IE.py File 61 B 0644
reflect_helper_VE.py File 82 B 0644
reflect_helper_ZDE.py File 47 B 0644
server.pem File 4.34 KB 0644
ssl_helpers.py File 1.01 KB 0644
stdio_test_consumer.py File 1.19 KB 0644
stdio_test_halfclose.py File 1.89 KB 0644
stdio_test_hostpeer.py File 1021 B 0644
stdio_test_lastwrite.py File 1.18 KB 0644
stdio_test_loseconn.py File 1.51 KB 0644
stdio_test_producer.py File 1.47 KB 0644
stdio_test_write.py File 923 B 0644
stdio_test_writeseq.py File 915 B 0644
test_abstract.py File 3.42 KB 0644
test_adbapi.py File 25.53 KB 0644
test_amp.py File 107.96 KB 0644
test_application.py File 32.05 KB 0644
test_compat.py File 27.32 KB 0644
test_context.py File 1.48 KB 0644
test_cooperator.py File 20.96 KB 0644
test_defer.py File 100.93 KB 0644
test_defgen.py File 10.45 KB 0644
test_dict.py File 1.41 KB 0644
test_dirdbm.py File 6.76 KB 0644
test_error.py File 8.39 KB 0644
test_factories.py File 4.53 KB 0644
test_failure.py File 29.92 KB 0644
test_fdesc.py File 7.2 KB 0644
test_finger.py File 1.95 KB 0644
test_formmethod.py File 3.56 KB 0644
test_ftp.py File 127.27 KB 0644
test_ftp_options.py File 2.62 KB 0644
test_htb.py File 3.12 KB 0644
test_ident.py File 6.85 KB 0644
test_internet.py File 45.33 KB 0644
test_iosim.py File 8.49 KB 0644
test_iutils.py File 13.13 KB 0644
test_lockfile.py File 15.14 KB 0644
test_log.py File 35.48 KB 0644
test_logfile.py File 17.8 KB 0644
test_loopback.py File 14.15 KB 0644
test_main.py File 2.44 KB 0644
test_memcache.py File 24.55 KB 0644
test_modules.py File 17.47 KB 0644
test_monkey.py File 5.5 KB 0644
test_nooldstyle.py File 5.82 KB 0644
test_paths.py File 72.61 KB 0644
test_pcp.py File 12.26 KB 0644
test_persisted.py File 14.28 KB 0644
test_plugin.py File 25.5 KB 0644
test_policies.py File 32.04 KB 0644
test_postfix.py File 3.53 KB 0644
test_process.py File 84.1 KB 0644
test_protocols.py File 7.28 KB 0644
test_randbytes.py File 3.28 KB 0644
test_rebuild.py File 8.3 KB 0644
test_reflect.py File 25.47 KB 0644
test_roots.py File 1.77 KB 0644
test_shortcut.py File 1.89 KB 0644
test_sip.py File 24.69 KB 0644
test_sob.py File 5.5 KB 0644
test_socks.py File 17.32 KB 0644
test_ssl.py File 23.29 KB 0644
test_sslverify.py File 104.28 KB 0644
test_stateful.py File 1.97 KB 0644
test_stdio.py File 12.85 KB 0644
test_strerror.py File 5.06 KB 0644
test_stringtransport.py File 12.95 KB 0644
test_strports.py File 1.75 KB 0644
test_task.py File 38.4 KB 0644
test_tcp.py File 64.07 KB 0644
test_tcp_internals.py File 8.54 KB 0644
test_text.py File 6.3 KB 0644
test_threadable.py File 3.65 KB 0644
test_threadpool.py File 22.47 KB 0644
test_threads.py File 12.96 KB 0644
test_tpfile.py File 1.56 KB 0644
test_twistd.py File 61.05 KB 0644
test_twisted.py File 18.42 KB 0644
test_udp.py File 24.1 KB 0644
test_unix.py File 14.8 KB 0644
test_usage.py File 23.09 KB 0644
testutils.py File 5.19 KB 0644