404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.14.135.249: ~ $
# -*- test-case-name: twisted.test.test_udp -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{IReactorUDP} and L{IReactorMulticast}.
"""

from __future__ import division, absolute_import

from twisted.trial import unittest

from twisted.python.compat import intToBytes
from twisted.internet.defer import Deferred, gatherResults, maybeDeferred
from twisted.internet import protocol, reactor, error, defer, interfaces, udp
from twisted.python import runtime


class Mixin:

    started = 0
    stopped = 0

    startedDeferred = None

    def __init__(self):
        self.packets = []

    def startProtocol(self):
        self.started = 1
        if self.startedDeferred is not None:
            d, self.startedDeferred = self.startedDeferred, None
            d.callback(None)

    def stopProtocol(self):
        self.stopped = 1


class Server(Mixin, protocol.DatagramProtocol):
    packetReceived = None
    refused = 0


    def datagramReceived(self, data, addr):
        self.packets.append((data, addr))
        if self.packetReceived is not None:
            d, self.packetReceived = self.packetReceived, None
            d.callback(None)



class Client(Mixin, protocol.ConnectedDatagramProtocol):

    packetReceived = None
    refused = 0

    def datagramReceived(self, data):
        self.packets.append(data)
        if self.packetReceived is not None:
            d, self.packetReceived = self.packetReceived, None
            d.callback(None)

    def connectionFailed(self, failure):
        if self.startedDeferred is not None:
            d, self.startedDeferred = self.startedDeferred, None
            d.errback(failure)
        self.failure = failure

    def connectionRefused(self):
        if self.startedDeferred is not None:
            d, self.startedDeferred = self.startedDeferred, None
            d.errback(error.ConnectionRefusedError("yup"))
        self.refused = 1


class GoodClient(Server):

    def connectionRefused(self):
        if self.startedDeferred is not None:
            d, self.startedDeferred = self.startedDeferred, None
            d.errback(error.ConnectionRefusedError("yup"))
        self.refused = 1



class BadClientError(Exception):
    """
    Raised by BadClient at the end of every datagramReceived call to try and
    screw stuff up.
    """



class BadClient(protocol.DatagramProtocol):
    """
    A DatagramProtocol which always raises an exception from datagramReceived.
    Used to test error handling behavior in the reactor for that method.
    """
    d = None

    def setDeferred(self, d):
        """
        Set the Deferred which will be called back when datagramReceived is
        called.
        """
        self.d = d


    def datagramReceived(self, bytes, addr):
        if self.d is not None:
            d, self.d = self.d, None
            d.callback(bytes)
        raise BadClientError("Application code is very buggy!")



class UDPTests(unittest.TestCase):

    def test_oldAddress(self):
        """
        The C{type} of the host address of a listening L{DatagramProtocol}'s
        transport is C{"UDP"}.
        """
        server = Server()
        d = server.startedDeferred = defer.Deferred()
        p = reactor.listenUDP(0, server, interface="127.0.0.1")
        def cbStarted(ignored):
            addr = p.getHost()
            self.assertEqual(addr.type, 'UDP')
            return p.stopListening()
        return d.addCallback(cbStarted)


    def test_startStop(self):
        """
        The L{DatagramProtocol}'s C{startProtocol} and C{stopProtocol}
        methods are called when its transports starts and stops listening,
        respectively.
        """
        server = Server()
        d = server.startedDeferred = defer.Deferred()
        port1 = reactor.listenUDP(0, server, interface="127.0.0.1")
        def cbStarted(ignored):
            self.assertEqual(server.started, 1)
            self.assertEqual(server.stopped, 0)
            return port1.stopListening()
        def cbStopped(ignored):
            self.assertEqual(server.stopped, 1)
        return d.addCallback(cbStarted).addCallback(cbStopped)


    def test_rebind(self):
        """
        Re-listening with the same L{DatagramProtocol} re-invokes the
        C{startProtocol} callback.
        """
        server = Server()
        d = server.startedDeferred = defer.Deferred()
        p = reactor.listenUDP(0, server, interface="127.0.0.1")

        def cbStarted(ignored, port):
            return port.stopListening()

        def cbStopped(ignored):
            d = server.startedDeferred = defer.Deferred()
            p = reactor.listenUDP(0, server, interface="127.0.0.1")
            return d.addCallback(cbStarted, p)

        return d.addCallback(cbStarted, p)


    def test_bindError(self):
        """
        A L{CannotListenError} exception is raised when attempting to bind a
        second protocol instance to an already bound port
        """
        server = Server()
        d = server.startedDeferred = defer.Deferred()
        port = reactor.listenUDP(0, server, interface='127.0.0.1')

        def cbStarted(ignored):
            self.assertEqual(port.getHost(), server.transport.getHost())
            server2 = Server()
            self.assertRaises(
                error.CannotListenError,
                reactor.listenUDP, port.getHost().port, server2,
                interface='127.0.0.1')
        d.addCallback(cbStarted)

        def cbFinished(ignored):
            return port.stopListening()
        d.addCallback(cbFinished)
        return d


    def test_sendPackets(self):
        """
        Datagrams can be sent with the transport's C{write} method and
        received via the C{datagramReceived} callback method.
        """
        server = Server()
        serverStarted = server.startedDeferred = defer.Deferred()
        port1 = reactor.listenUDP(0, server, interface="127.0.0.1")

        client = GoodClient()
        clientStarted = client.startedDeferred = defer.Deferred()

        def cbServerStarted(ignored):
            self.port2 = reactor.listenUDP(0, client, interface="127.0.0.1")
            return clientStarted

        d = serverStarted.addCallback(cbServerStarted)

        def cbClientStarted(ignored):
            client.transport.connect("127.0.0.1",
                                     server.transport.getHost().port)
            cAddr = client.transport.getHost()
            sAddr = server.transport.getHost()

            serverSend = client.packetReceived = defer.Deferred()
            server.transport.write(b"hello", (cAddr.host, cAddr.port))

            clientWrites = [
                (b"a",),
                (b"b", None),
                (b"c", (sAddr.host, sAddr.port))]

            def cbClientSend(ignored):
                if clientWrites:
                    nextClientWrite = server.packetReceived = defer.Deferred()
                    nextClientWrite.addCallback(cbClientSend)
                    client.transport.write(*clientWrites.pop(0))
                    return nextClientWrite

            # No one will ever call .errback on either of these Deferreds,
            # but there is a non-trivial amount of test code which might
            # cause them to fail somehow.  So fireOnOneErrback=True.
            return defer.DeferredList([
                cbClientSend(None),
                serverSend],
                fireOnOneErrback=True)

        d.addCallback(cbClientStarted)

        def cbSendsFinished(ignored):
            cAddr = client.transport.getHost()
            sAddr = server.transport.getHost()
            self.assertEqual(
                client.packets,
                [(b"hello", (sAddr.host, sAddr.port))])
            clientAddr = (cAddr.host, cAddr.port)
            self.assertEqual(
                server.packets,
                [(b"a", clientAddr),
                 (b"b", clientAddr),
                 (b"c", clientAddr)])

        d.addCallback(cbSendsFinished)

        def cbFinished(ignored):
            return defer.DeferredList([
                defer.maybeDeferred(port1.stopListening),
                defer.maybeDeferred(self.port2.stopListening)],
                fireOnOneErrback=True)

        d.addCallback(cbFinished)
        return d



    def test_connectionRefused(self):
        """
        A L{ConnectionRefusedError} exception is raised when a connection
        attempt is actively refused by the other end.

        Note: This test assumes no one is listening on port 80 UDP.
        """
        client = GoodClient()
        clientStarted = client.startedDeferred = defer.Deferred()
        port = reactor.listenUDP(0, client, interface="127.0.0.1")

        server = Server()
        serverStarted = server.startedDeferred = defer.Deferred()
        port2 = reactor.listenUDP(0, server, interface="127.0.0.1")

        d = defer.DeferredList(
            [clientStarted, serverStarted],
            fireOnOneErrback=True)

        def cbStarted(ignored):
            connectionRefused = client.startedDeferred = defer.Deferred()
            client.transport.connect("127.0.0.1", 80)

            for i in range(10):
                client.transport.write(intToBytes(i))
                server.transport.write(intToBytes(i), ("127.0.0.1", 80))

            return self.assertFailure(
                connectionRefused,
                error.ConnectionRefusedError)

        d.addCallback(cbStarted)

        def cbFinished(ignored):
            return defer.DeferredList([
                defer.maybeDeferred(port.stopListening),
                defer.maybeDeferred(port2.stopListening)],
                fireOnOneErrback=True)

        d.addCallback(cbFinished)
        return d


    def test_badConnect(self):
        """
        A call to the transport's connect method fails with an
        L{InvalidAddressError} when a non-IP address is passed as the host
        value.

        A call to a transport's connect method fails with a L{RuntimeError}
        when the transport is already connected.
        """
        client = GoodClient()
        port = reactor.listenUDP(0, client, interface="127.0.0.1")
        self.assertRaises(error.InvalidAddressError, client.transport.connect,
                          "localhost", 80)
        client.transport.connect("127.0.0.1", 80)
        self.assertRaises(RuntimeError, client.transport.connect,
                          "127.0.0.1", 80)
        return port.stopListening()



    def test_datagramReceivedError(self):
        """
        When datagramReceived raises an exception it is logged but the port
        is not disconnected.
        """
        finalDeferred = defer.Deferred()

        def cbCompleted(ign):
            """
            Flush the exceptions which the reactor should have logged and make
            sure they're actually there.
            """
            errs = self.flushLoggedErrors(BadClientError)
            self.assertEqual(len(errs), 2, "Incorrectly found %d errors, expected 2" % (len(errs),))
        finalDeferred.addCallback(cbCompleted)

        client = BadClient()
        port = reactor.listenUDP(0, client, interface='127.0.0.1')

        def cbCleanup(result):
            """
            Disconnect the port we started and pass on whatever was given to us
            in case it was a Failure.
            """
            return defer.maybeDeferred(port.stopListening).addBoth(lambda ign: result)
        finalDeferred.addBoth(cbCleanup)

        addr = port.getHost()

        # UDP is not reliable.  Try to send as many as 60 packets before giving
        # up.  Conceivably, all sixty could be lost, but they probably won't be
        # unless all UDP traffic is being dropped, and then the rest of these
        # UDP tests will likely fail as well.  Ideally, this test (and probably
        # others) wouldn't even use actual UDP traffic: instead, they would
        # stub out the socket with a fake one which could be made to behave in
        # whatever way the test desires.  Unfortunately, this is hard because
        # of differences in various reactor implementations.
        attempts = list(range(60))
        succeededAttempts = []

        def makeAttempt():
            """
            Send one packet to the listening BadClient.  Set up a 0.1 second
            timeout to do re-transmits in case the packet is dropped.  When two
            packets have been received by the BadClient, stop sending and let
            the finalDeferred's callbacks do some assertions.
            """
            if not attempts:
                try:
                    self.fail("Not enough packets received")
                except:
                    finalDeferred.errback()

            self.failIfIdentical(client.transport, None, "UDP Protocol lost its transport")

            packet = intToBytes(attempts.pop(0))
            packetDeferred = defer.Deferred()
            client.setDeferred(packetDeferred)
            client.transport.write(packet, (addr.host, addr.port))

            def cbPacketReceived(packet):
                """
                A packet arrived.  Cancel the timeout for it, record it, and
                maybe finish the test.
                """
                timeoutCall.cancel()
                succeededAttempts.append(packet)
                if len(succeededAttempts) == 2:
                    # The second error has not yet been logged, since the
                    # exception which causes it hasn't even been raised yet.
                    # Give the datagramReceived call a chance to finish, then
                    # let the test finish asserting things.
                    reactor.callLater(0, finalDeferred.callback, None)
                else:
                    makeAttempt()

            def ebPacketTimeout(err):
                """
                The packet wasn't received quickly enough.  Try sending another
                one.  It doesn't matter if the packet for which this was the
                timeout eventually arrives: makeAttempt throws away the
                Deferred on which this function is the errback, so when
                datagramReceived callbacks, so it won't be on this Deferred, so
                it won't raise an AlreadyCalledError.
                """
                makeAttempt()

            packetDeferred.addCallbacks(cbPacketReceived, ebPacketTimeout)
            packetDeferred.addErrback(finalDeferred.errback)

            timeoutCall = reactor.callLater(
                0.1, packetDeferred.errback,
                error.TimeoutError(
                    "Timed out in testDatagramReceivedError"))

        makeAttempt()
        return finalDeferred


    def test_NoWarningOnBroadcast(self):
        """
        C{'<broadcast>'} is an alternative way to say C{'255.255.255.255'}
        ({socket.gethostbyname("<broadcast>")} returns C{'255.255.255.255'}),
        so because it becomes a valid IP address, no deprecation warning about
        passing hostnames to L{twisted.internet.udp.Port.write} needs to be
        emitted by C{write()} in this case.
        """
        class fakeSocket:
            def sendto(self, foo, bar):
                pass

        p = udp.Port(0, Server())
        p.socket = fakeSocket()
        p.write(b"test", ("<broadcast>", 1234))

        warnings = self.flushWarnings([self.test_NoWarningOnBroadcast])
        self.assertEqual(len(warnings), 0)



class ReactorShutdownInteractionTests(unittest.TestCase):
    """Test reactor shutdown interaction"""

    def setUp(self):
        """Start a UDP port"""
        self.server = Server()
        self.port = reactor.listenUDP(0, self.server, interface='127.0.0.1')

    def tearDown(self):
        """Stop the UDP port"""
        return self.port.stopListening()

    def testShutdownFromDatagramReceived(self):
        """Test reactor shutdown while in a recvfrom() loop"""

        # udp.Port's doRead calls recvfrom() in a loop, as an optimization.
        # It is important this loop terminate under various conditions.
        # Previously, if datagramReceived synchronously invoked
        # reactor.stop(), under certain reactors, the Port's socket would
        # synchronously disappear, causing an AttributeError inside that
        # loop.  This was mishandled, causing the loop to spin forever.
        # This test is primarily to ensure that the loop never spins
        # forever.

        finished = defer.Deferred()
        pr = self.server.packetReceived = defer.Deferred()

        def pktRece(ignored):
            # Simulate reactor.stop() behavior :(
            self.server.transport.connectionLost()
            # Then delay this Deferred chain until the protocol has been
            # disconnected, as the reactor should do in an error condition
            # such as we are inducing.  This is very much a whitebox test.
            reactor.callLater(0, finished.callback, None)
        pr.addCallback(pktRece)

        def flushErrors(ignored):
            # We are breaking abstraction and calling private APIs, any
            # number of horrible errors might occur.  As long as the reactor
            # doesn't hang, this test is satisfied.  (There may be room for
            # another, stricter test.)
            self.flushLoggedErrors()
        finished.addCallback(flushErrors)
        self.server.transport.write(b'\0' * 64, ('127.0.0.1',
                                    self.server.transport.getHost().port))
        return finished



class MulticastTests(unittest.TestCase):

    def setUp(self):
        self.server = Server()
        self.client = Client()
        # multicast won't work if we listen over loopback, apparently
        self.port1 = reactor.listenMulticast(0, self.server)
        self.port2 = reactor.listenMulticast(0, self.client)
        self.client.transport.connect(
            "127.0.0.1", self.server.transport.getHost().port)


    def tearDown(self):
        return gatherResults([
            maybeDeferred(self.port1.stopListening),
            maybeDeferred(self.port2.stopListening)])


    def testTTL(self):
        for o in self.client, self.server:
            self.assertEqual(o.transport.getTTL(), 1)
            o.transport.setTTL(2)
            self.assertEqual(o.transport.getTTL(), 2)


    def test_loopback(self):
        """
        Test that after loopback mode has been set, multicast packets are
        delivered to their sender.
        """
        self.assertEqual(self.server.transport.getLoopbackMode(), 1)
        addr = self.server.transport.getHost()
        joined = self.server.transport.joinGroup("225.0.0.250")

        def cbJoined(ignored):
            d = self.server.packetReceived = Deferred()
            self.server.transport.write(b"hello", ("225.0.0.250", addr.port))
            return d
        joined.addCallback(cbJoined)

        def cbPacket(ignored):
            self.assertEqual(len(self.server.packets), 1)
            self.server.transport.setLoopbackMode(0)
            self.assertEqual(self.server.transport.getLoopbackMode(), 0)
            self.server.transport.write(b"hello", ("225.0.0.250", addr.port))

            # This is fairly lame.
            d = Deferred()
            reactor.callLater(0, d.callback, None)
            return d
        joined.addCallback(cbPacket)

        def cbNoPacket(ignored):
            self.assertEqual(len(self.server.packets), 1)
        joined.addCallback(cbNoPacket)

        return joined


    def test_interface(self):
        """
        Test C{getOutgoingInterface} and C{setOutgoingInterface}.
        """
        self.assertEqual(
            self.client.transport.getOutgoingInterface(), "0.0.0.0")
        self.assertEqual(
            self.server.transport.getOutgoingInterface(), "0.0.0.0")

        d1 = self.client.transport.setOutgoingInterface("127.0.0.1")
        d2 = self.server.transport.setOutgoingInterface("127.0.0.1")
        result = gatherResults([d1, d2])

        def cbInterfaces(ignored):
            self.assertEqual(
                self.client.transport.getOutgoingInterface(), "127.0.0.1")
            self.assertEqual(
                self.server.transport.getOutgoingInterface(), "127.0.0.1")
        result.addCallback(cbInterfaces)
        return result


    def test_joinLeave(self):
        """
        Test that multicast a group can be joined and left.
        """
        d = self.client.transport.joinGroup("225.0.0.250")

        def clientJoined(ignored):
            return self.client.transport.leaveGroup("225.0.0.250")
        d.addCallback(clientJoined)

        def clientLeft(ignored):
            return self.server.transport.joinGroup("225.0.0.250")
        d.addCallback(clientLeft)

        def serverJoined(ignored):
            return self.server.transport.leaveGroup("225.0.0.250")
        d.addCallback(serverJoined)

        return d


    def test_joinFailure(self):
        """
        Test that an attempt to join an address which is not a multicast
        address fails with L{error.MulticastJoinError}.
        """
        # 127.0.0.1 is not a multicast address, so joining it should fail.
        return self.assertFailure(
            self.client.transport.joinGroup("127.0.0.1"),
            error.MulticastJoinError)
    if runtime.platform.isWindows() and not runtime.platform.isVista():
        # FIXME: https://twistedmatrix.com/trac/ticket/7780
        test_joinFailure.skip = (
            "Windows' UDP multicast is not yet fully supported.")


    def test_multicast(self):
        """
        Test that a multicast group can be joined and messages sent to and
        received from it.
        """
        c = Server()
        p = reactor.listenMulticast(0, c)
        addr = self.server.transport.getHost()

        joined = self.server.transport.joinGroup("225.0.0.250")

        def cbJoined(ignored):
            d = self.server.packetReceived = Deferred()
            c.transport.write(b"hello world", ("225.0.0.250", addr.port))
            return d
        joined.addCallback(cbJoined)

        def cbPacket(ignored):
            self.assertEqual(self.server.packets[0][0], b"hello world")
        joined.addCallback(cbPacket)

        def cleanup(passthrough):
            result = maybeDeferred(p.stopListening)
            result.addCallback(lambda ign: passthrough)
            return result
        joined.addCallback(cleanup)

        return joined


    def test_multiListen(self):
        """
        Test that multiple sockets can listen on the same multicast port and
        that they both receive multicast messages directed to that address.
        """
        firstClient = Server()
        firstPort = reactor.listenMulticast(
            0, firstClient, listenMultiple=True)

        portno = firstPort.getHost().port

        secondClient = Server()
        secondPort = reactor.listenMulticast(
            portno, secondClient, listenMultiple=True)

        theGroup = "225.0.0.250"
        joined = gatherResults([self.server.transport.joinGroup(theGroup),
                                firstPort.joinGroup(theGroup),
                                secondPort.joinGroup(theGroup)])


        def serverJoined(ignored):
            d1 = firstClient.packetReceived = Deferred()
            d2 = secondClient.packetReceived = Deferred()
            firstClient.transport.write(b"hello world", (theGroup, portno))
            return gatherResults([d1, d2])
        joined.addCallback(serverJoined)

        def gotPackets(ignored):
            self.assertEqual(firstClient.packets[0][0], b"hello world")
            self.assertEqual(secondClient.packets[0][0], b"hello world")
        joined.addCallback(gotPackets)

        def cleanup(passthrough):
            result = gatherResults([
                maybeDeferred(firstPort.stopListening),
                maybeDeferred(secondPort.stopListening)])
            result.addCallback(lambda ign: passthrough)
            return result
        joined.addBoth(cleanup)
        return joined
    if runtime.platform.isWindows():
        test_multiListen.skip = ("on non-linux platforms it appears multiple "
                                 "processes can listen, but not multiple sockets "
                                 "in same process?")


if not interfaces.IReactorUDP(reactor, None):
    UDPTests.skip = "This reactor does not support UDP"
    ReactorShutdownInteractionTests.skip = "This reactor does not support UDP"
if not interfaces.IReactorMulticast(reactor, None):
    MulticastTests.skip = "This reactor does not support multicast"

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 103 B 0644
cert.pem.no_trailing_newline File 1.38 KB 0644
crash_test_dummy.py File 543 B 0644
iosim.py File 17.3 KB 0644
key.pem.no_trailing_newline File 1.67 KB 0644
mock_win32process.py File 1.46 KB 0644
myrebuilder1.py File 158 B 0644
myrebuilder2.py File 158 B 0644
plugin_basic.py File 943 B 0644
plugin_extra1.py File 407 B 0644
plugin_extra2.py File 579 B 0644
process_cmdline.py File 162 B 0644
process_echoer.py File 214 B 0644
process_fds.py File 945 B 0644
process_getargv.py File 283 B 0644
process_getenv.py File 268 B 0644
process_linger.py File 286 B 0644
process_reader.py File 188 B 0644
process_signal.py File 214 B 0644
process_stdinreader.py File 857 B 0644
process_tester.py File 1.01 KB 0644
process_tty.py File 130 B 0644
process_twisted.py File 1.18 KB 0644
proto_helpers.py File 26.33 KB 0644
raiser.c File 93.05 KB 0644
raiser.cpython-36m-x86_64-linux-gnu.so File 19.16 KB 0644
raiser.pyx File 466 B 0644
reflect_helper_IE.py File 61 B 0644
reflect_helper_VE.py File 82 B 0644
reflect_helper_ZDE.py File 47 B 0644
server.pem File 4.34 KB 0644
ssl_helpers.py File 1.01 KB 0644
stdio_test_consumer.py File 1.19 KB 0644
stdio_test_halfclose.py File 1.89 KB 0644
stdio_test_hostpeer.py File 1021 B 0644
stdio_test_lastwrite.py File 1.18 KB 0644
stdio_test_loseconn.py File 1.51 KB 0644
stdio_test_producer.py File 1.47 KB 0644
stdio_test_write.py File 923 B 0644
stdio_test_writeseq.py File 915 B 0644
test_abstract.py File 3.42 KB 0644
test_adbapi.py File 25.53 KB 0644
test_amp.py File 107.96 KB 0644
test_application.py File 32.05 KB 0644
test_compat.py File 27.32 KB 0644
test_context.py File 1.48 KB 0644
test_cooperator.py File 20.96 KB 0644
test_defer.py File 100.93 KB 0644
test_defgen.py File 10.45 KB 0644
test_dict.py File 1.41 KB 0644
test_dirdbm.py File 6.76 KB 0644
test_error.py File 8.39 KB 0644
test_factories.py File 4.53 KB 0644
test_failure.py File 29.92 KB 0644
test_fdesc.py File 7.2 KB 0644
test_finger.py File 1.95 KB 0644
test_formmethod.py File 3.56 KB 0644
test_ftp.py File 127.27 KB 0644
test_ftp_options.py File 2.62 KB 0644
test_htb.py File 3.12 KB 0644
test_ident.py File 6.85 KB 0644
test_internet.py File 45.33 KB 0644
test_iosim.py File 8.49 KB 0644
test_iutils.py File 13.13 KB 0644
test_lockfile.py File 15.14 KB 0644
test_log.py File 35.48 KB 0644
test_logfile.py File 17.8 KB 0644
test_loopback.py File 14.15 KB 0644
test_main.py File 2.44 KB 0644
test_memcache.py File 24.55 KB 0644
test_modules.py File 17.47 KB 0644
test_monkey.py File 5.5 KB 0644
test_nooldstyle.py File 5.82 KB 0644
test_paths.py File 72.61 KB 0644
test_pcp.py File 12.26 KB 0644
test_persisted.py File 14.28 KB 0644
test_plugin.py File 25.5 KB 0644
test_policies.py File 32.04 KB 0644
test_postfix.py File 3.53 KB 0644
test_process.py File 84.1 KB 0644
test_protocols.py File 7.28 KB 0644
test_randbytes.py File 3.28 KB 0644
test_rebuild.py File 8.3 KB 0644
test_reflect.py File 25.47 KB 0644
test_roots.py File 1.77 KB 0644
test_shortcut.py File 1.89 KB 0644
test_sip.py File 24.69 KB 0644
test_sob.py File 5.5 KB 0644
test_socks.py File 17.32 KB 0644
test_ssl.py File 23.29 KB 0644
test_sslverify.py File 104.28 KB 0644
test_stateful.py File 1.97 KB 0644
test_stdio.py File 12.85 KB 0644
test_strerror.py File 5.06 KB 0644
test_stringtransport.py File 12.95 KB 0644
test_strports.py File 1.75 KB 0644
test_task.py File 38.4 KB 0644
test_tcp.py File 64.07 KB 0644
test_tcp_internals.py File 8.54 KB 0644
test_text.py File 6.3 KB 0644
test_threadable.py File 3.65 KB 0644
test_threadpool.py File 22.47 KB 0644
test_threads.py File 12.96 KB 0644
test_tpfile.py File 1.56 KB 0644
test_twistd.py File 61.05 KB 0644
test_twisted.py File 18.42 KB 0644
test_udp.py File 24.1 KB 0644
test_unix.py File 14.8 KB 0644
test_usage.py File 23.09 KB 0644
testutils.py File 5.19 KB 0644