# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Tests for L{twisted.internet.fdesc}. """ import os, sys import errno try: import fcntl except ImportError: skip = "not supported on this platform" else: from twisted.internet import fdesc from twisted.python.util import untilConcludes from twisted.trial import unittest class NonBlockingTests(unittest.SynchronousTestCase): """ Tests for L{fdesc.setNonBlocking} and L{fdesc.setBlocking}. """ def test_setNonBlocking(self): """ L{fdesc.setNonBlocking} sets a file description to non-blocking. """ r, w = os.pipe() self.addCleanup(os.close, r) self.addCleanup(os.close, w) self.assertFalse(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK) fdesc.setNonBlocking(r) self.assertTrue(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK) def test_setBlocking(self): """ L{fdesc.setBlocking} sets a file description to blocking. """ r, w = os.pipe() self.addCleanup(os.close, r) self.addCleanup(os.close, w) fdesc.setNonBlocking(r) fdesc.setBlocking(r) self.assertFalse(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK) class ReadWriteTests(unittest.SynchronousTestCase): """ Tests for L{fdesc.readFromFD}, L{fdesc.writeToFD}. """ def setUp(self): """ Create a non-blocking pipe that can be used in tests. """ self.r, self.w = os.pipe() fdesc.setNonBlocking(self.r) fdesc.setNonBlocking(self.w) def tearDown(self): """ Close pipes. """ try: os.close(self.w) except OSError: pass try: os.close(self.r) except OSError: pass def write(self, d): """ Write data to the pipe. """ return fdesc.writeToFD(self.w, d) def read(self): """ Read data from the pipe. """ l = [] res = fdesc.readFromFD(self.r, l.append) if res is None: if l: return l[0] else: return b"" else: return res def test_writeAndRead(self): """ Test that the number of bytes L{fdesc.writeToFD} reports as written with its return value are seen by L{fdesc.readFromFD}. """ n = self.write(b"hello") self.assertTrue(n > 0) s = self.read() self.assertEqual(len(s), n) self.assertEqual(b"hello"[:n], s) def test_writeAndReadLarge(self): """ Similar to L{test_writeAndRead}, but use a much larger string to verify the behavior for that case. """ orig = b"0123456879" * 10000 written = self.write(orig) self.assertTrue(written > 0) result = [] resultlength = 0 i = 0 while resultlength < written or i < 50: result.append(self.read()) resultlength += len(result[-1]) # Increment a counter to be sure we'll exit at some point i += 1 result = b"".join(result) self.assertEqual(len(result), written) self.assertEqual(orig[:written], result) def test_readFromEmpty(self): """ Verify that reading from a file descriptor with no data does not raise an exception and does not result in the callback function being called. """ l = [] result = fdesc.readFromFD(self.r, l.append) self.assertEqual(l, []) self.assertIsNone(result) def test_readFromCleanClose(self): """ Test that using L{fdesc.readFromFD} on a cleanly closed file descriptor returns a connection done indicator. """ os.close(self.w) self.assertEqual(self.read(), fdesc.CONNECTION_DONE) def test_writeToClosed(self): """ Verify that writing with L{fdesc.writeToFD} when the read end is closed results in a connection lost indicator. """ os.close(self.r) self.assertEqual(self.write(b"s"), fdesc.CONNECTION_LOST) def test_readFromInvalid(self): """ Verify that reading with L{fdesc.readFromFD} when the read end is closed results in a connection lost indicator. """ os.close(self.r) self.assertEqual(self.read(), fdesc.CONNECTION_LOST) def test_writeToInvalid(self): """ Verify that writing with L{fdesc.writeToFD} when the write end is closed results in a connection lost indicator. """ os.close(self.w) self.assertEqual(self.write(b"s"), fdesc.CONNECTION_LOST) def test_writeErrors(self): """ Test error path for L{fdesc.writeTod}. """ oldOsWrite = os.write def eagainWrite(fd, data): err = OSError() err.errno = errno.EAGAIN raise err os.write = eagainWrite try: self.assertEqual(self.write(b"s"), 0) finally: os.write = oldOsWrite def eintrWrite(fd, data): err = OSError() err.errno = errno.EINTR raise err os.write = eintrWrite try: self.assertEqual(self.write(b"s"), 0) finally: os.write = oldOsWrite class CloseOnExecTests(unittest.SynchronousTestCase): """ Tests for L{fdesc._setCloseOnExec} and L{fdesc._unsetCloseOnExec}. """ program = ''' import os, errno try: os.write(%d, b'lul') except OSError as e: if e.errno == errno.EBADF: os._exit(0) os._exit(5) except: os._exit(10) else: os._exit(20) ''' def _execWithFileDescriptor(self, fObj): pid = os.fork() if pid == 0: try: os.execv(sys.executable, [sys.executable, '-c', self.program % (fObj.fileno(),)]) except: import traceback traceback.print_exc() os._exit(30) else: # On Linux wait(2) doesn't seem ever able to fail with EINTR but # POSIX seems to allow it and on OS X it happens quite a lot. return untilConcludes(os.waitpid, pid, 0)[1] def test_setCloseOnExec(self): """ A file descriptor passed to L{fdesc._setCloseOnExec} is not inherited by a new process image created with one of the exec family of functions. """ with open(self.mktemp(), 'wb') as fObj: fdesc._setCloseOnExec(fObj.fileno()) status = self._execWithFileDescriptor(fObj) self.assertTrue(os.WIFEXITED(status)) self.assertEqual(os.WEXITSTATUS(status), 0) def test_unsetCloseOnExec(self): """ A file descriptor passed to L{fdesc._unsetCloseOnExec} is inherited by a new process image created with one of the exec family of functions. """ with open(self.mktemp(), 'wb') as fObj: fdesc._setCloseOnExec(fObj.fileno()) fdesc._unsetCloseOnExec(fObj.fileno()) status = self._execWithFileDescriptor(fObj) self.assertTrue(os.WIFEXITED(status)) self.assertEqual(os.WEXITSTATUS(status), 20)
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
__pycache__ | Folder | 0755 |
|
|
__init__.py | File | 103 B | 0644 |
|
cert.pem.no_trailing_newline | File | 1.38 KB | 0644 |
|
crash_test_dummy.py | File | 543 B | 0644 |
|
iosim.py | File | 17.3 KB | 0644 |
|
key.pem.no_trailing_newline | File | 1.67 KB | 0644 |
|
mock_win32process.py | File | 1.46 KB | 0644 |
|
myrebuilder1.py | File | 158 B | 0644 |
|
myrebuilder2.py | File | 158 B | 0644 |
|
plugin_basic.py | File | 943 B | 0644 |
|
plugin_extra1.py | File | 407 B | 0644 |
|
plugin_extra2.py | File | 579 B | 0644 |
|
process_cmdline.py | File | 162 B | 0644 |
|
process_echoer.py | File | 214 B | 0644 |
|
process_fds.py | File | 945 B | 0644 |
|
process_getargv.py | File | 283 B | 0644 |
|
process_getenv.py | File | 268 B | 0644 |
|
process_linger.py | File | 286 B | 0644 |
|
process_reader.py | File | 188 B | 0644 |
|
process_signal.py | File | 214 B | 0644 |
|
process_stdinreader.py | File | 857 B | 0644 |
|
process_tester.py | File | 1.01 KB | 0644 |
|
process_tty.py | File | 130 B | 0644 |
|
process_twisted.py | File | 1.18 KB | 0644 |
|
proto_helpers.py | File | 26.33 KB | 0644 |
|
raiser.c | File | 93.05 KB | 0644 |
|
raiser.cpython-36m-x86_64-linux-gnu.so | File | 19.16 KB | 0644 |
|
raiser.pyx | File | 466 B | 0644 |
|
reflect_helper_IE.py | File | 61 B | 0644 |
|
reflect_helper_VE.py | File | 82 B | 0644 |
|
reflect_helper_ZDE.py | File | 47 B | 0644 |
|
server.pem | File | 4.34 KB | 0644 |
|
ssl_helpers.py | File | 1.01 KB | 0644 |
|
stdio_test_consumer.py | File | 1.19 KB | 0644 |
|
stdio_test_halfclose.py | File | 1.89 KB | 0644 |
|
stdio_test_hostpeer.py | File | 1021 B | 0644 |
|
stdio_test_lastwrite.py | File | 1.18 KB | 0644 |
|
stdio_test_loseconn.py | File | 1.51 KB | 0644 |
|
stdio_test_producer.py | File | 1.47 KB | 0644 |
|
stdio_test_write.py | File | 923 B | 0644 |
|
stdio_test_writeseq.py | File | 915 B | 0644 |
|
test_abstract.py | File | 3.42 KB | 0644 |
|
test_adbapi.py | File | 25.53 KB | 0644 |
|
test_amp.py | File | 107.96 KB | 0644 |
|
test_application.py | File | 32.05 KB | 0644 |
|
test_compat.py | File | 27.32 KB | 0644 |
|
test_context.py | File | 1.48 KB | 0644 |
|
test_cooperator.py | File | 20.96 KB | 0644 |
|
test_defer.py | File | 100.93 KB | 0644 |
|
test_defgen.py | File | 10.45 KB | 0644 |
|
test_dict.py | File | 1.41 KB | 0644 |
|
test_dirdbm.py | File | 6.76 KB | 0644 |
|
test_error.py | File | 8.39 KB | 0644 |
|
test_factories.py | File | 4.53 KB | 0644 |
|
test_failure.py | File | 29.92 KB | 0644 |
|
test_fdesc.py | File | 7.2 KB | 0644 |
|
test_finger.py | File | 1.95 KB | 0644 |
|
test_formmethod.py | File | 3.56 KB | 0644 |
|
test_ftp.py | File | 127.27 KB | 0644 |
|
test_ftp_options.py | File | 2.62 KB | 0644 |
|
test_htb.py | File | 3.12 KB | 0644 |
|
test_ident.py | File | 6.85 KB | 0644 |
|
test_internet.py | File | 45.33 KB | 0644 |
|
test_iosim.py | File | 8.49 KB | 0644 |
|
test_iutils.py | File | 13.13 KB | 0644 |
|
test_lockfile.py | File | 15.14 KB | 0644 |
|
test_log.py | File | 35.48 KB | 0644 |
|
test_logfile.py | File | 17.8 KB | 0644 |
|
test_loopback.py | File | 14.15 KB | 0644 |
|
test_main.py | File | 2.44 KB | 0644 |
|
test_memcache.py | File | 24.55 KB | 0644 |
|
test_modules.py | File | 17.47 KB | 0644 |
|
test_monkey.py | File | 5.5 KB | 0644 |
|
test_nooldstyle.py | File | 5.82 KB | 0644 |
|
test_paths.py | File | 72.61 KB | 0644 |
|
test_pcp.py | File | 12.26 KB | 0644 |
|
test_persisted.py | File | 14.28 KB | 0644 |
|
test_plugin.py | File | 25.5 KB | 0644 |
|
test_policies.py | File | 32.04 KB | 0644 |
|
test_postfix.py | File | 3.53 KB | 0644 |
|
test_process.py | File | 84.1 KB | 0644 |
|
test_protocols.py | File | 7.28 KB | 0644 |
|
test_randbytes.py | File | 3.28 KB | 0644 |
|
test_rebuild.py | File | 8.3 KB | 0644 |
|
test_reflect.py | File | 25.47 KB | 0644 |
|
test_roots.py | File | 1.77 KB | 0644 |
|
test_shortcut.py | File | 1.89 KB | 0644 |
|
test_sip.py | File | 24.69 KB | 0644 |
|
test_sob.py | File | 5.5 KB | 0644 |
|
test_socks.py | File | 17.32 KB | 0644 |
|
test_ssl.py | File | 23.29 KB | 0644 |
|
test_sslverify.py | File | 104.28 KB | 0644 |
|
test_stateful.py | File | 1.97 KB | 0644 |
|
test_stdio.py | File | 12.85 KB | 0644 |
|
test_strerror.py | File | 5.06 KB | 0644 |
|
test_stringtransport.py | File | 12.95 KB | 0644 |
|
test_strports.py | File | 1.75 KB | 0644 |
|
test_task.py | File | 38.4 KB | 0644 |
|
test_tcp.py | File | 64.07 KB | 0644 |
|
test_tcp_internals.py | File | 8.54 KB | 0644 |
|
test_text.py | File | 6.3 KB | 0644 |
|
test_threadable.py | File | 3.65 KB | 0644 |
|
test_threadpool.py | File | 22.47 KB | 0644 |
|
test_threads.py | File | 12.96 KB | 0644 |
|
test_tpfile.py | File | 1.56 KB | 0644 |
|
test_twistd.py | File | 61.05 KB | 0644 |
|
test_twisted.py | File | 18.42 KB | 0644 |
|
test_udp.py | File | 24.1 KB | 0644 |
|
test_unix.py | File | 14.8 KB | 0644 |
|
test_usage.py | File | 23.09 KB | 0644 |
|
testutils.py | File | 5.19 KB | 0644 |
|