[ Avaa Bypassed ]



botdev@ ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

Tests for L{twisted.test.proto_helpers}.

from zope.interface.verify import verifyObject

from twisted.internet.interfaces import (ITransport, IPushProducer, IConsumer,
    IReactorTCP, IReactorSSL, IReactorUNIX, IAddress, IListeningPort,
from twisted.internet.address import IPv4Address
from twisted.trial.unittest import TestCase
from twisted.test.proto_helpers import (StringTransport, MemoryReactor,
    RaisingMemoryReactor, NonStreamingProducer)
from twisted.internet.protocol import ClientFactory, Factory

class StringTransportTests(TestCase):
    Tests for L{twisted.test.proto_helpers.StringTransport}.
    def setUp(self):
        self.transport = StringTransport()

    def test_interfaces(self):
        L{StringTransport} instances provide L{ITransport}, L{IPushProducer},
        and L{IConsumer}.
        self.assertTrue(verifyObject(ITransport, self.transport))
        self.assertTrue(verifyObject(IPushProducer, self.transport))
        self.assertTrue(verifyObject(IConsumer, self.transport))

    def test_registerProducer(self):
        L{StringTransport.registerProducer} records the arguments supplied to
        it as instance attributes.
        producer = object()
        streaming = object()
        self.transport.registerProducer(producer, streaming)
        self.assertIs(self.transport.producer, producer)
        self.assertIs(self.transport.streaming, streaming)

    def test_disallowedRegisterProducer(self):
        L{StringTransport.registerProducer} raises L{RuntimeError} if a
        producer is already registered.
        producer = object()
        self.transport.registerProducer(producer, True)
            RuntimeError, self.transport.registerProducer, object(), False)
        self.assertIs(self.transport.producer, producer)

    def test_unregisterProducer(self):
        L{StringTransport.unregisterProducer} causes the transport to forget
        about the registered producer and makes it possible to register a new
        oldProducer = object()
        newProducer = object()
        self.transport.registerProducer(oldProducer, False)
        self.transport.registerProducer(newProducer, True)
        self.assertIs(self.transport.producer, newProducer)

    def test_invalidUnregisterProducer(self):
        L{StringTransport.unregisterProducer} raises L{RuntimeError} if called
        when no producer is registered.
        self.assertRaises(RuntimeError, self.transport.unregisterProducer)

    def test_initialProducerState(self):
        L{StringTransport.producerState} is initially C{'producing'}.
        self.assertEqual(self.transport.producerState, 'producing')

    def test_pauseProducing(self):
        L{StringTransport.pauseProducing} changes the C{producerState} of the
        transport to C{'paused'}.
        self.assertEqual(self.transport.producerState, 'paused')

    def test_resumeProducing(self):
        L{StringTransport.resumeProducing} changes the C{producerState} of the
        transport to C{'producing'}.
        self.assertEqual(self.transport.producerState, 'producing')

    def test_stopProducing(self):
        L{StringTransport.stopProducing} changes the C{'producerState'} of the
        transport to C{'stopped'}.
        self.assertEqual(self.transport.producerState, 'stopped')

    def test_stoppedTransportCannotPause(self):
        L{StringTransport.pauseProducing} raises L{RuntimeError} if the
        transport has been stopped.
        self.assertRaises(RuntimeError, self.transport.pauseProducing)

    def test_stoppedTransportCannotResume(self):
        L{StringTransport.resumeProducing} raises L{RuntimeError} if the
        transport has been stopped.
        self.assertRaises(RuntimeError, self.transport.resumeProducing)

    def test_disconnectingTransportCannotPause(self):
        L{StringTransport.pauseProducing} raises L{RuntimeError} if the
        transport is being disconnected.
        self.assertRaises(RuntimeError, self.transport.pauseProducing)

    def test_disconnectingTransportCannotResume(self):
        L{StringTransport.resumeProducing} raises L{RuntimeError} if the
        transport is being disconnected.
        self.assertRaises(RuntimeError, self.transport.resumeProducing)

    def test_loseConnectionSetsDisconnecting(self):
        L{StringTransport.loseConnection} toggles the C{disconnecting} instance
        variable to C{True}.

    def test_specifiedHostAddress(self):
        If a host address is passed to L{StringTransport.__init__}, that
        value is returned from L{StringTransport.getHost}.
        address = object()
        self.assertIs(StringTransport(address).getHost(), address)

    def test_specifiedPeerAddress(self):
        If a peer address is passed to L{StringTransport.__init__}, that
        value is returned from L{StringTransport.getPeer}.
        address = object()
            StringTransport(peerAddress=address).getPeer(), address)

    def test_defaultHostAddress(self):
        If no host address is passed to L{StringTransport.__init__}, an
        L{IPv4Address} is returned from L{StringTransport.getHost}.
        address = StringTransport().getHost()
        self.assertIsInstance(address, IPv4Address)

    def test_defaultPeerAddress(self):
        If no peer address is passed to L{StringTransport.__init__}, an
        L{IPv4Address} is returned from L{StringTransport.getPeer}.
        address = StringTransport().getPeer()
        self.assertIsInstance(address, IPv4Address)

class ReactorTests(TestCase):
    Tests for L{MemoryReactor} and L{RaisingMemoryReactor}.

    def test_memoryReactorProvides(self):
        L{MemoryReactor} provides all of the attributes described by the
        interfaces it advertises.
        memoryReactor = MemoryReactor()
        verifyObject(IReactorTCP, memoryReactor)
        verifyObject(IReactorSSL, memoryReactor)
        verifyObject(IReactorUNIX, memoryReactor)

    def test_raisingReactorProvides(self):
        L{RaisingMemoryReactor} provides all of the attributes described by the
        interfaces it advertises.
        raisingReactor = RaisingMemoryReactor()
        verifyObject(IReactorTCP, raisingReactor)
        verifyObject(IReactorSSL, raisingReactor)
        verifyObject(IReactorUNIX, raisingReactor)

    def test_connectDestination(self):
        L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and
        L{MemoryReactor.connectUNIX} will return an L{IConnector} whose
        C{getDestination} method returns an L{IAddress} with attributes which
        reflect the values passed.
        memoryReactor = MemoryReactor()
        for connector in [memoryReactor.connectTCP(
                              "test.example.com", 8321, ClientFactory()),
                              "test.example.com", 8321, ClientFactory(),
            verifyObject(IConnector, connector)
            address = connector.getDestination()
            verifyObject(IAddress, address)
            self.assertEqual(address.host, "test.example.com")
            self.assertEqual(address.port, 8321)
        connector = memoryReactor.connectUNIX(b"/fake/path", ClientFactory())
        verifyObject(IConnector, connector)
        address = connector.getDestination()
        verifyObject(IAddress, address)
        self.assertEqual(address.name, b"/fake/path")

    def test_listenDefaultHost(self):
        L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and
        L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose
        C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL}
        will have a default host of C{''}, and a port that reflects the
        value passed, and C{listenUNIX} will have a name that reflects the path
        memoryReactor = MemoryReactor()
        for port in [memoryReactor.listenTCP(8242, Factory()),
                     memoryReactor.listenSSL(8242, Factory(), None)]:
            verifyObject(IListeningPort, port)
            address = port.getHost()
            verifyObject(IAddress, address)
            self.assertEqual(address.host, '')
            self.assertEqual(address.port, 8242)
        port = memoryReactor.listenUNIX(b"/path/to/socket", Factory())
        verifyObject(IListeningPort, port)
        address = port.getHost()
        verifyObject(IAddress, address)
        self.assertEqual(address.name, b"/path/to/socket")

    def test_readers(self):
        Adding, removing, and listing readers works.
        reader = object()
        reactor = MemoryReactor()


        self.assertEqual(reactor.getReaders(), [reader])


        self.assertEqual(reactor.getReaders(), [])

    def test_writers(self):
        Adding, removing, and listing writers works.
        writer = object()
        reactor = MemoryReactor()


        self.assertEqual(reactor.getWriters(), [writer])


        self.assertEqual(reactor.getWriters(), [])

class TestConsumer(object):
    A very basic test consumer for use with the NonStreamingProducerTests.
    def __init__(self):
        self.writes = []
        self.producer = None
        self.producerStreaming = None

    def registerProducer(self, producer, streaming):
        Registers a single producer with this consumer. Just keeps track of it.

        @param producer: The producer to register.
        @param streaming: Whether the producer is a streaming one or not.
        self.producer = producer
        self.producerStreaming = streaming

    def unregisterProducer(self):
        Forget the producer we had previously registered.
        self.producer = None
        self.producerStreaming = None

    def write(self, data):
        Some data was written to the consumer: stores it for later use.

        @param data: The data to write.

class NonStreamingProducerTests(TestCase):
    Tests for the L{NonStreamingProducer} to validate behaviour.
    def test_producesOnly10Times(self):
        When the L{NonStreamingProducer} has resumeProducing called 10 times,
        it writes the counter each time and then fails.
        consumer = TestConsumer()
        producer = NonStreamingProducer(consumer)
        consumer.registerProducer(producer, False)

        self.assertIs(consumer.producer, producer)
        self.assertIs(producer.consumer, consumer)

        for _ in range(10):

        # We should have unregistered the producer and printed the 10 results.
        expectedWrites = [
            b'0', b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9'
        self.assertEqual(consumer.writes, expectedWrites)

        # Another attempt to produce fails.
        self.assertRaises(RuntimeError, producer.resumeProducing)

    def test_cannotPauseProduction(self):
        When the L{NonStreamingProducer} is paused, it raises a
        consumer = TestConsumer()
        producer = NonStreamingProducer(consumer)
        consumer.registerProducer(producer, False)

        # Produce once, just to be safe.

        self.assertRaises(RuntimeError, producer.pauseProducing)


Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 103 B 0644
cert.pem.no_trailing_newline File 1.38 KB 0644
crash_test_dummy.py File 543 B 0644
iosim.py File 17.3 KB 0644
key.pem.no_trailing_newline File 1.67 KB 0644
mock_win32process.py File 1.46 KB 0644
myrebuilder1.py File 158 B 0644
myrebuilder2.py File 158 B 0644
plugin_basic.py File 943 B 0644
plugin_extra1.py File 407 B 0644
plugin_extra2.py File 579 B 0644
process_cmdline.py File 162 B 0644
process_echoer.py File 214 B 0644
process_fds.py File 945 B 0644
process_getargv.py File 283 B 0644
process_getenv.py File 268 B 0644
process_linger.py File 286 B 0644
process_reader.py File 188 B 0644
process_signal.py File 214 B 0644
process_stdinreader.py File 857 B 0644
process_tester.py File 1.01 KB 0644
process_tty.py File 130 B 0644
process_twisted.py File 1.18 KB 0644
proto_helpers.py File 26.33 KB 0644
raiser.c File 93.05 KB 0644
raiser.cpython-36m-x86_64-linux-gnu.so File 19.16 KB 0644
raiser.pyx File 466 B 0644
reflect_helper_IE.py File 61 B 0644
reflect_helper_VE.py File 82 B 0644
reflect_helper_ZDE.py File 47 B 0644
server.pem File 4.34 KB 0644
ssl_helpers.py File 1.01 KB 0644
stdio_test_consumer.py File 1.19 KB 0644
stdio_test_halfclose.py File 1.89 KB 0644
stdio_test_hostpeer.py File 1021 B 0644
stdio_test_lastwrite.py File 1.18 KB 0644
stdio_test_loseconn.py File 1.51 KB 0644
stdio_test_producer.py File 1.47 KB 0644
stdio_test_write.py File 923 B 0644
stdio_test_writeseq.py File 915 B 0644
test_abstract.py File 3.42 KB 0644
test_adbapi.py File 25.53 KB 0644
test_amp.py File 107.96 KB 0644
test_application.py File 32.05 KB 0644
test_compat.py File 27.32 KB 0644
test_context.py File 1.48 KB 0644
test_cooperator.py File 20.96 KB 0644
test_defer.py File 100.93 KB 0644
test_defgen.py File 10.45 KB 0644
test_dict.py File 1.41 KB 0644
test_dirdbm.py File 6.76 KB 0644
test_error.py File 8.39 KB 0644
test_factories.py File 4.53 KB 0644
test_failure.py File 29.92 KB 0644
test_fdesc.py File 7.2 KB 0644
test_finger.py File 1.95 KB 0644
test_formmethod.py File 3.56 KB 0644
test_ftp.py File 127.27 KB 0644
test_ftp_options.py File 2.62 KB 0644
test_htb.py File 3.12 KB 0644
test_ident.py File 6.85 KB 0644
test_internet.py File 45.33 KB 0644
test_iosim.py File 8.49 KB 0644
test_iutils.py File 13.13 KB 0644
test_lockfile.py File 15.14 KB 0644
test_log.py File 35.48 KB 0644
test_logfile.py File 17.8 KB 0644
test_loopback.py File 14.15 KB 0644
test_main.py File 2.44 KB 0644
test_memcache.py File 24.55 KB 0644
test_modules.py File 17.47 KB 0644
test_monkey.py File 5.5 KB 0644
test_nooldstyle.py File 5.82 KB 0644
test_paths.py File 72.61 KB 0644
test_pcp.py File 12.26 KB 0644
test_persisted.py File 14.28 KB 0644
test_plugin.py File 25.5 KB 0644
test_policies.py File 32.04 KB 0644
test_postfix.py File 3.53 KB 0644
test_process.py File 84.1 KB 0644
test_protocols.py File 7.28 KB 0644
test_randbytes.py File 3.28 KB 0644
test_rebuild.py File 8.3 KB 0644
test_reflect.py File 25.47 KB 0644
test_roots.py File 1.77 KB 0644
test_shortcut.py File 1.89 KB 0644
test_sip.py File 24.69 KB 0644
test_sob.py File 5.5 KB 0644
test_socks.py File 17.32 KB 0644
test_ssl.py File 23.29 KB 0644
test_sslverify.py File 104.28 KB 0644
test_stateful.py File 1.97 KB 0644
test_stdio.py File 12.85 KB 0644
test_strerror.py File 5.06 KB 0644
test_stringtransport.py File 12.95 KB 0644
test_strports.py File 1.75 KB 0644
test_task.py File 38.4 KB 0644
test_tcp.py File 64.07 KB 0644
test_tcp_internals.py File 8.54 KB 0644
test_text.py File 6.3 KB 0644
test_threadable.py File 3.65 KB 0644
test_threadpool.py File 22.47 KB 0644
test_threads.py File 12.96 KB 0644
test_tpfile.py File 1.56 KB 0644
test_twistd.py File 61.05 KB 0644
test_twisted.py File 18.42 KB 0644
test_udp.py File 24.1 KB 0644
test_unix.py File 14.8 KB 0644
test_usage.py File 23.09 KB 0644
testutils.py File 5.19 KB 0644