# -*- Python -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. __version__ = '$Revision: 1.5 $'[11:-2] from twisted.trial import unittest from twisted.protocols import pcp # Goal: # Take a Protocol instance. Own all outgoing data - anything that # would go to p.transport.write. Own all incoming data - anything # that comes to p.dataReceived. # I need: # Something with the AbstractFileDescriptor interface. # That is: # - acts as a Transport # - has a method write() # - which buffers # - acts as a Consumer # - has a registerProducer, unRegisterProducer # - tells the Producer to back off (pauseProducing) when its buffer is full. # - tells the Producer to resumeProducing when its buffer is not so full. # - acts as a Producer # - calls registerProducer # - calls write() on consumers # - honors requests to pause/resume producing # - honors stopProducing, and passes it along to upstream Producers class DummyTransport: """A dumb transport to wrap around.""" def __init__(self): self._writes = [] def write(self, data): self._writes.append(data) def getvalue(self): return ''.join(self._writes) class DummyProducer: resumed = False stopped = False paused = False def __init__(self, consumer): self.consumer = consumer def resumeProducing(self): self.resumed = True self.paused = False def pauseProducing(self): self.paused = True def stopProducing(self): self.stopped = True class DummyConsumer(DummyTransport): producer = None finished = False unregistered = True def registerProducer(self, producer, streaming): self.producer = (producer, streaming) def unregisterProducer(self): self.unregistered = True def finish(self): self.finished = True class TransportInterfaceTests(unittest.TestCase): proxyClass = pcp.BasicProducerConsumerProxy def setUp(self): self.underlying = DummyConsumer() self.transport = self.proxyClass(self.underlying) def testWrite(self): self.transport.write("some bytes") class ConsumerInterfaceTest: """Test ProducerConsumerProxy as a Consumer. Normally we have ProducingServer -> ConsumingTransport. If I am to go between (Server -> Shaper -> Transport), I have to play the role of Consumer convincingly for the ProducingServer. """ def setUp(self): self.underlying = DummyConsumer() self.consumer = self.proxyClass(self.underlying) self.producer = DummyProducer(self.consumer) def testRegisterPush(self): self.consumer.registerProducer(self.producer, True) ## Consumer should NOT have called PushProducer.resumeProducing self.assertFalse(self.producer.resumed) ## I'm I'm just a proxy, should I only do resumeProducing when ## I get poked myself? #def testRegisterPull(self): # self.consumer.registerProducer(self.producer, False) # ## Consumer SHOULD have called PushProducer.resumeProducing # self.assertTrue(self.producer.resumed) def testUnregister(self): self.consumer.registerProducer(self.producer, False) self.consumer.unregisterProducer() # Now when the consumer would ordinarily want more data, it # shouldn't ask producer for it. # The most succinct way to trigger "want more data" is to proxy for # a PullProducer and have someone ask me for data. self.producer.resumed = False self.consumer.resumeProducing() self.assertFalse(self.producer.resumed) def testFinish(self): self.consumer.registerProducer(self.producer, False) self.consumer.finish() # I guess finish should behave like unregister? self.producer.resumed = False self.consumer.resumeProducing() self.assertFalse(self.producer.resumed) class ProducerInterfaceTest: """Test ProducerConsumerProxy as a Producer. Normally we have ProducingServer -> ConsumingTransport. If I am to go between (Server -> Shaper -> Transport), I have to play the role of Producer convincingly for the ConsumingTransport. """ def setUp(self): self.consumer = DummyConsumer() self.producer = self.proxyClass(self.consumer) def testRegistersProducer(self): self.assertEqual(self.consumer.producer[0], self.producer) def testPause(self): self.producer.pauseProducing() self.producer.write("yakkity yak") self.assertFalse(self.consumer.getvalue(), "Paused producer should not have sent data.") def testResume(self): self.producer.pauseProducing() self.producer.resumeProducing() self.producer.write("yakkity yak") self.assertEqual(self.consumer.getvalue(), "yakkity yak") def testResumeNoEmptyWrite(self): self.producer.pauseProducing() self.producer.resumeProducing() self.assertEqual(len(self.consumer._writes), 0, "Resume triggered an empty write.") def testResumeBuffer(self): self.producer.pauseProducing() self.producer.write("buffer this") self.producer.resumeProducing() self.assertEqual(self.consumer.getvalue(), "buffer this") def testStop(self): self.producer.stopProducing() self.producer.write("yakkity yak") self.assertFalse(self.consumer.getvalue(), "Stopped producer should not have sent data.") class PCP_ConsumerInterfaceTests(ConsumerInterfaceTest, unittest.TestCase): proxyClass = pcp.BasicProducerConsumerProxy class PCPII_ConsumerInterfaceTests(ConsumerInterfaceTest, unittest.TestCase): proxyClass = pcp.ProducerConsumerProxy class PCP_ProducerInterfaceTests(ProducerInterfaceTest, unittest.TestCase): proxyClass = pcp.BasicProducerConsumerProxy class PCPII_ProducerInterfaceTests(ProducerInterfaceTest, unittest.TestCase): proxyClass = pcp.ProducerConsumerProxy class ProducerProxyTests(unittest.TestCase): """Producer methods on me should be relayed to the Producer I proxy. """ proxyClass = pcp.BasicProducerConsumerProxy def setUp(self): self.proxy = self.proxyClass(None) self.parentProducer = DummyProducer(self.proxy) self.proxy.registerProducer(self.parentProducer, True) def testStop(self): self.proxy.stopProducing() self.assertTrue(self.parentProducer.stopped) class ConsumerProxyTests(unittest.TestCase): """Consumer methods on me should be relayed to the Consumer I proxy. """ proxyClass = pcp.BasicProducerConsumerProxy def setUp(self): self.underlying = DummyConsumer() self.consumer = self.proxyClass(self.underlying) def testWrite(self): # NOTE: This test only valid for streaming (Push) systems. self.consumer.write("some bytes") self.assertEqual(self.underlying.getvalue(), "some bytes") def testFinish(self): self.consumer.finish() self.assertTrue(self.underlying.finished) def testUnregister(self): self.consumer.unregisterProducer() self.assertTrue(self.underlying.unregistered) class PullProducerTest: def setUp(self): self.underlying = DummyConsumer() self.proxy = self.proxyClass(self.underlying) self.parentProducer = DummyProducer(self.proxy) self.proxy.registerProducer(self.parentProducer, True) def testHoldWrites(self): self.proxy.write("hello") # Consumer should get no data before it says resumeProducing. self.assertFalse(self.underlying.getvalue(), "Pulling Consumer got data before it pulled.") def testPull(self): self.proxy.write("hello") self.proxy.resumeProducing() self.assertEqual(self.underlying.getvalue(), "hello") def testMergeWrites(self): self.proxy.write("hello ") self.proxy.write("sunshine") self.proxy.resumeProducing() nwrites = len(self.underlying._writes) self.assertEqual(nwrites, 1, "Pull resulted in %d writes instead " "of 1." % (nwrites,)) self.assertEqual(self.underlying.getvalue(), "hello sunshine") def testLateWrite(self): # consumer sends its initial pull before we have data self.proxy.resumeProducing() self.proxy.write("data") # This data should answer that pull request. self.assertEqual(self.underlying.getvalue(), "data") class PCP_PullProducerTests(PullProducerTest, unittest.TestCase): class proxyClass(pcp.BasicProducerConsumerProxy): iAmStreaming = False class PCPII_PullProducerTests(PullProducerTest, unittest.TestCase): class proxyClass(pcp.ProducerConsumerProxy): iAmStreaming = False # Buffering! class BufferedConsumerTests(unittest.TestCase): """As a consumer, ask the producer to pause after too much data.""" proxyClass = pcp.ProducerConsumerProxy def setUp(self): self.underlying = DummyConsumer() self.proxy = self.proxyClass(self.underlying) self.proxy.bufferSize = 100 self.parentProducer = DummyProducer(self.proxy) self.proxy.registerProducer(self.parentProducer, True) def testRegisterPull(self): self.proxy.registerProducer(self.parentProducer, False) ## Consumer SHOULD have called PushProducer.resumeProducing self.assertTrue(self.parentProducer.resumed) def testPauseIntercept(self): self.proxy.pauseProducing() self.assertFalse(self.parentProducer.paused) def testResumeIntercept(self): self.proxy.pauseProducing() self.proxy.resumeProducing() # With a streaming producer, just because the proxy was resumed is # not necessarily a reason to resume the parent producer. The state # of the buffer should decide that. self.assertFalse(self.parentProducer.resumed) def testTriggerPause(self): """Make sure I say \"when.\"""" # Pause the proxy so data sent to it builds up in its buffer. self.proxy.pauseProducing() self.assertFalse(self.parentProducer.paused, "don't pause yet") self.proxy.write("x" * 51) self.assertFalse(self.parentProducer.paused, "don't pause yet") self.proxy.write("x" * 51) self.assertTrue(self.parentProducer.paused) def testTriggerResume(self): """Make sure I resumeProducing when my buffer empties.""" self.proxy.pauseProducing() self.proxy.write("x" * 102) self.assertTrue(self.parentProducer.paused, "should be paused") self.proxy.resumeProducing() # Resuming should have emptied my buffer, so I should tell my # parent to resume too. self.assertFalse(self.parentProducer.paused, "Producer should have resumed.") self.assertFalse(self.proxy.producerPaused) class BufferedPullTests(unittest.TestCase): class proxyClass(pcp.ProducerConsumerProxy): iAmStreaming = False def _writeSomeData(self, data): pcp.ProducerConsumerProxy._writeSomeData(self, data[:100]) return min(len(data), 100) def setUp(self): self.underlying = DummyConsumer() self.proxy = self.proxyClass(self.underlying) self.proxy.bufferSize = 100 self.parentProducer = DummyProducer(self.proxy) self.proxy.registerProducer(self.parentProducer, False) def testResumePull(self): # If proxy has no data to send on resumeProducing, it had better pull # some from its PullProducer. self.parentProducer.resumed = False self.proxy.resumeProducing() self.assertTrue(self.parentProducer.resumed) def testLateWriteBuffering(self): # consumer sends its initial pull before we have data self.proxy.resumeProducing() self.proxy.write("datum" * 21) # This data should answer that pull request. self.assertEqual(self.underlying.getvalue(), "datum" * 20) # but there should be some left over self.assertEqual(self.proxy._buffer, ["datum"]) # TODO: # test that web request finishing bug (when we weren't proxying # unregisterProducer but were proxying finish, web file transfers # would hang on the last block.) # test what happens if writeSomeBytes decided to write zero bytes.
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
__pycache__ | Folder | 0755 |
|
|
__init__.py | File | 103 B | 0644 |
|
cert.pem.no_trailing_newline | File | 1.38 KB | 0644 |
|
crash_test_dummy.py | File | 543 B | 0644 |
|
iosim.py | File | 17.3 KB | 0644 |
|
key.pem.no_trailing_newline | File | 1.67 KB | 0644 |
|
mock_win32process.py | File | 1.46 KB | 0644 |
|
myrebuilder1.py | File | 158 B | 0644 |
|
myrebuilder2.py | File | 158 B | 0644 |
|
plugin_basic.py | File | 943 B | 0644 |
|
plugin_extra1.py | File | 407 B | 0644 |
|
plugin_extra2.py | File | 579 B | 0644 |
|
process_cmdline.py | File | 162 B | 0644 |
|
process_echoer.py | File | 214 B | 0644 |
|
process_fds.py | File | 945 B | 0644 |
|
process_getargv.py | File | 283 B | 0644 |
|
process_getenv.py | File | 268 B | 0644 |
|
process_linger.py | File | 286 B | 0644 |
|
process_reader.py | File | 188 B | 0644 |
|
process_signal.py | File | 214 B | 0644 |
|
process_stdinreader.py | File | 857 B | 0644 |
|
process_tester.py | File | 1.01 KB | 0644 |
|
process_tty.py | File | 130 B | 0644 |
|
process_twisted.py | File | 1.18 KB | 0644 |
|
proto_helpers.py | File | 26.33 KB | 0644 |
|
raiser.c | File | 93.05 KB | 0644 |
|
raiser.cpython-36m-x86_64-linux-gnu.so | File | 19.16 KB | 0644 |
|
raiser.pyx | File | 466 B | 0644 |
|
reflect_helper_IE.py | File | 61 B | 0644 |
|
reflect_helper_VE.py | File | 82 B | 0644 |
|
reflect_helper_ZDE.py | File | 47 B | 0644 |
|
server.pem | File | 4.34 KB | 0644 |
|
ssl_helpers.py | File | 1.01 KB | 0644 |
|
stdio_test_consumer.py | File | 1.19 KB | 0644 |
|
stdio_test_halfclose.py | File | 1.89 KB | 0644 |
|
stdio_test_hostpeer.py | File | 1021 B | 0644 |
|
stdio_test_lastwrite.py | File | 1.18 KB | 0644 |
|
stdio_test_loseconn.py | File | 1.51 KB | 0644 |
|
stdio_test_producer.py | File | 1.47 KB | 0644 |
|
stdio_test_write.py | File | 923 B | 0644 |
|
stdio_test_writeseq.py | File | 915 B | 0644 |
|
test_abstract.py | File | 3.42 KB | 0644 |
|
test_adbapi.py | File | 25.53 KB | 0644 |
|
test_amp.py | File | 107.96 KB | 0644 |
|
test_application.py | File | 32.05 KB | 0644 |
|
test_compat.py | File | 27.32 KB | 0644 |
|
test_context.py | File | 1.48 KB | 0644 |
|
test_cooperator.py | File | 20.96 KB | 0644 |
|
test_defer.py | File | 100.93 KB | 0644 |
|
test_defgen.py | File | 10.45 KB | 0644 |
|
test_dict.py | File | 1.41 KB | 0644 |
|
test_dirdbm.py | File | 6.76 KB | 0644 |
|
test_error.py | File | 8.39 KB | 0644 |
|
test_factories.py | File | 4.53 KB | 0644 |
|
test_failure.py | File | 29.92 KB | 0644 |
|
test_fdesc.py | File | 7.2 KB | 0644 |
|
test_finger.py | File | 1.95 KB | 0644 |
|
test_formmethod.py | File | 3.56 KB | 0644 |
|
test_ftp.py | File | 127.27 KB | 0644 |
|
test_ftp_options.py | File | 2.62 KB | 0644 |
|
test_htb.py | File | 3.12 KB | 0644 |
|
test_ident.py | File | 6.85 KB | 0644 |
|
test_internet.py | File | 45.33 KB | 0644 |
|
test_iosim.py | File | 8.49 KB | 0644 |
|
test_iutils.py | File | 13.13 KB | 0644 |
|
test_lockfile.py | File | 15.14 KB | 0644 |
|
test_log.py | File | 35.48 KB | 0644 |
|
test_logfile.py | File | 17.8 KB | 0644 |
|
test_loopback.py | File | 14.15 KB | 0644 |
|
test_main.py | File | 2.44 KB | 0644 |
|
test_memcache.py | File | 24.55 KB | 0644 |
|
test_modules.py | File | 17.47 KB | 0644 |
|
test_monkey.py | File | 5.5 KB | 0644 |
|
test_nooldstyle.py | File | 5.82 KB | 0644 |
|
test_paths.py | File | 72.61 KB | 0644 |
|
test_pcp.py | File | 12.26 KB | 0644 |
|
test_persisted.py | File | 14.28 KB | 0644 |
|
test_plugin.py | File | 25.5 KB | 0644 |
|
test_policies.py | File | 32.04 KB | 0644 |
|
test_postfix.py | File | 3.53 KB | 0644 |
|
test_process.py | File | 84.1 KB | 0644 |
|
test_protocols.py | File | 7.28 KB | 0644 |
|
test_randbytes.py | File | 3.28 KB | 0644 |
|
test_rebuild.py | File | 8.3 KB | 0644 |
|
test_reflect.py | File | 25.47 KB | 0644 |
|
test_roots.py | File | 1.77 KB | 0644 |
|
test_shortcut.py | File | 1.89 KB | 0644 |
|
test_sip.py | File | 24.69 KB | 0644 |
|
test_sob.py | File | 5.5 KB | 0644 |
|
test_socks.py | File | 17.32 KB | 0644 |
|
test_ssl.py | File | 23.29 KB | 0644 |
|
test_sslverify.py | File | 104.28 KB | 0644 |
|
test_stateful.py | File | 1.97 KB | 0644 |
|
test_stdio.py | File | 12.85 KB | 0644 |
|
test_strerror.py | File | 5.06 KB | 0644 |
|
test_stringtransport.py | File | 12.95 KB | 0644 |
|
test_strports.py | File | 1.75 KB | 0644 |
|
test_task.py | File | 38.4 KB | 0644 |
|
test_tcp.py | File | 64.07 KB | 0644 |
|
test_tcp_internals.py | File | 8.54 KB | 0644 |
|
test_text.py | File | 6.3 KB | 0644 |
|
test_threadable.py | File | 3.65 KB | 0644 |
|
test_threadpool.py | File | 22.47 KB | 0644 |
|
test_threads.py | File | 12.96 KB | 0644 |
|
test_tpfile.py | File | 1.56 KB | 0644 |
|
test_twistd.py | File | 61.05 KB | 0644 |
|
test_twisted.py | File | 18.42 KB | 0644 |
|
test_udp.py | File | 24.1 KB | 0644 |
|
test_unix.py | File | 14.8 KB | 0644 |
|
test_usage.py | File | 23.09 KB | 0644 |
|
testutils.py | File | 5.19 KB | 0644 |
|