404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.14.133.188: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.internet.fdesc}.
"""

import os, sys
import errno

try:
    import fcntl
except ImportError:
    skip = "not supported on this platform"
else:
    from twisted.internet import fdesc

from twisted.python.util import untilConcludes
from twisted.trial import unittest



class NonBlockingTests(unittest.SynchronousTestCase):
    """
    Tests for L{fdesc.setNonBlocking} and L{fdesc.setBlocking}.
    """

    def test_setNonBlocking(self):
        """
        L{fdesc.setNonBlocking} sets a file description to non-blocking.
        """
        r, w = os.pipe()
        self.addCleanup(os.close, r)
        self.addCleanup(os.close, w)
        self.assertFalse(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK)
        fdesc.setNonBlocking(r)
        self.assertTrue(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK)


    def test_setBlocking(self):
        """
        L{fdesc.setBlocking} sets a file description to blocking.
        """
        r, w = os.pipe()
        self.addCleanup(os.close, r)
        self.addCleanup(os.close, w)
        fdesc.setNonBlocking(r)
        fdesc.setBlocking(r)
        self.assertFalse(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK)



class ReadWriteTests(unittest.SynchronousTestCase):
    """
    Tests for L{fdesc.readFromFD}, L{fdesc.writeToFD}.
    """

    def setUp(self):
        """
        Create a non-blocking pipe that can be used in tests.
        """
        self.r, self.w = os.pipe()
        fdesc.setNonBlocking(self.r)
        fdesc.setNonBlocking(self.w)


    def tearDown(self):
        """
        Close pipes.
        """
        try:
            os.close(self.w)
        except OSError:
            pass
        try:
            os.close(self.r)
        except OSError:
            pass


    def write(self, d):
        """
        Write data to the pipe.
        """
        return fdesc.writeToFD(self.w, d)


    def read(self):
        """
        Read data from the pipe.
        """
        l = []
        res = fdesc.readFromFD(self.r, l.append)
        if res is None:
            if l:
                return l[0]
            else:
                return b""
        else:
            return res


    def test_writeAndRead(self):
        """
        Test that the number of bytes L{fdesc.writeToFD} reports as written
        with its return value are seen by L{fdesc.readFromFD}.
        """
        n = self.write(b"hello")
        self.assertTrue(n > 0)
        s = self.read()
        self.assertEqual(len(s), n)
        self.assertEqual(b"hello"[:n], s)


    def test_writeAndReadLarge(self):
        """
        Similar to L{test_writeAndRead}, but use a much larger string to verify
        the behavior for that case.
        """
        orig = b"0123456879" * 10000
        written = self.write(orig)
        self.assertTrue(written > 0)
        result = []
        resultlength = 0
        i = 0
        while resultlength < written or i < 50:
            result.append(self.read())
            resultlength += len(result[-1])
            # Increment a counter to be sure we'll exit at some point
            i += 1
        result = b"".join(result)
        self.assertEqual(len(result), written)
        self.assertEqual(orig[:written], result)


    def test_readFromEmpty(self):
        """
        Verify that reading from a file descriptor with no data does not raise
        an exception and does not result in the callback function being called.
        """
        l = []
        result = fdesc.readFromFD(self.r, l.append)
        self.assertEqual(l, [])
        self.assertIsNone(result)


    def test_readFromCleanClose(self):
        """
        Test that using L{fdesc.readFromFD} on a cleanly closed file descriptor
        returns a connection done indicator.
        """
        os.close(self.w)
        self.assertEqual(self.read(), fdesc.CONNECTION_DONE)


    def test_writeToClosed(self):
        """
        Verify that writing with L{fdesc.writeToFD} when the read end is closed
        results in a connection lost indicator.
        """
        os.close(self.r)
        self.assertEqual(self.write(b"s"), fdesc.CONNECTION_LOST)


    def test_readFromInvalid(self):
        """
        Verify that reading with L{fdesc.readFromFD} when the read end is
        closed results in a connection lost indicator.
        """
        os.close(self.r)
        self.assertEqual(self.read(), fdesc.CONNECTION_LOST)


    def test_writeToInvalid(self):
        """
        Verify that writing with L{fdesc.writeToFD} when the write end is
        closed results in a connection lost indicator.
        """
        os.close(self.w)
        self.assertEqual(self.write(b"s"), fdesc.CONNECTION_LOST)


    def test_writeErrors(self):
        """
        Test error path for L{fdesc.writeTod}.
        """
        oldOsWrite = os.write
        def eagainWrite(fd, data):
            err = OSError()
            err.errno = errno.EAGAIN
            raise err
        os.write = eagainWrite
        try:
            self.assertEqual(self.write(b"s"), 0)
        finally:
            os.write = oldOsWrite

        def eintrWrite(fd, data):
            err = OSError()
            err.errno = errno.EINTR
            raise err
        os.write = eintrWrite
        try:
            self.assertEqual(self.write(b"s"), 0)
        finally:
            os.write = oldOsWrite



class CloseOnExecTests(unittest.SynchronousTestCase):
    """
    Tests for L{fdesc._setCloseOnExec} and L{fdesc._unsetCloseOnExec}.
    """
    program = '''
import os, errno
try:
    os.write(%d, b'lul')
except OSError as e:
    if e.errno == errno.EBADF:
        os._exit(0)
    os._exit(5)
except:
    os._exit(10)
else:
    os._exit(20)
'''

    def _execWithFileDescriptor(self, fObj):
        pid = os.fork()
        if pid == 0:
            try:
                os.execv(sys.executable, [sys.executable, '-c', self.program % (fObj.fileno(),)])
            except:
                import traceback
                traceback.print_exc()
                os._exit(30)
        else:
            # On Linux wait(2) doesn't seem ever able to fail with EINTR but
            # POSIX seems to allow it and on OS X it happens quite a lot.
            return untilConcludes(os.waitpid, pid, 0)[1]


    def test_setCloseOnExec(self):
        """
        A file descriptor passed to L{fdesc._setCloseOnExec} is not inherited
        by a new process image created with one of the exec family of
        functions.
        """
        with open(self.mktemp(), 'wb') as fObj:
            fdesc._setCloseOnExec(fObj.fileno())
            status = self._execWithFileDescriptor(fObj)
            self.assertTrue(os.WIFEXITED(status))
            self.assertEqual(os.WEXITSTATUS(status), 0)


    def test_unsetCloseOnExec(self):
        """
        A file descriptor passed to L{fdesc._unsetCloseOnExec} is inherited by
        a new process image created with one of the exec family of functions.
        """
        with open(self.mktemp(), 'wb') as fObj:
            fdesc._setCloseOnExec(fObj.fileno())
            fdesc._unsetCloseOnExec(fObj.fileno())
            status = self._execWithFileDescriptor(fObj)
            self.assertTrue(os.WIFEXITED(status))
            self.assertEqual(os.WEXITSTATUS(status), 20)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 103 B 0644
cert.pem.no_trailing_newline File 1.38 KB 0644
crash_test_dummy.py File 543 B 0644
iosim.py File 17.3 KB 0644
key.pem.no_trailing_newline File 1.67 KB 0644
mock_win32process.py File 1.46 KB 0644
myrebuilder1.py File 158 B 0644
myrebuilder2.py File 158 B 0644
plugin_basic.py File 943 B 0644
plugin_extra1.py File 407 B 0644
plugin_extra2.py File 579 B 0644
process_cmdline.py File 162 B 0644
process_echoer.py File 214 B 0644
process_fds.py File 945 B 0644
process_getargv.py File 283 B 0644
process_getenv.py File 268 B 0644
process_linger.py File 286 B 0644
process_reader.py File 188 B 0644
process_signal.py File 214 B 0644
process_stdinreader.py File 857 B 0644
process_tester.py File 1.01 KB 0644
process_tty.py File 130 B 0644
process_twisted.py File 1.18 KB 0644
proto_helpers.py File 26.33 KB 0644
raiser.c File 93.05 KB 0644
raiser.cpython-36m-x86_64-linux-gnu.so File 19.16 KB 0644
raiser.pyx File 466 B 0644
reflect_helper_IE.py File 61 B 0644
reflect_helper_VE.py File 82 B 0644
reflect_helper_ZDE.py File 47 B 0644
server.pem File 4.34 KB 0644
ssl_helpers.py File 1.01 KB 0644
stdio_test_consumer.py File 1.19 KB 0644
stdio_test_halfclose.py File 1.89 KB 0644
stdio_test_hostpeer.py File 1021 B 0644
stdio_test_lastwrite.py File 1.18 KB 0644
stdio_test_loseconn.py File 1.51 KB 0644
stdio_test_producer.py File 1.47 KB 0644
stdio_test_write.py File 923 B 0644
stdio_test_writeseq.py File 915 B 0644
test_abstract.py File 3.42 KB 0644
test_adbapi.py File 25.53 KB 0644
test_amp.py File 107.96 KB 0644
test_application.py File 32.05 KB 0644
test_compat.py File 27.32 KB 0644
test_context.py File 1.48 KB 0644
test_cooperator.py File 20.96 KB 0644
test_defer.py File 100.93 KB 0644
test_defgen.py File 10.45 KB 0644
test_dict.py File 1.41 KB 0644
test_dirdbm.py File 6.76 KB 0644
test_error.py File 8.39 KB 0644
test_factories.py File 4.53 KB 0644
test_failure.py File 29.92 KB 0644
test_fdesc.py File 7.2 KB 0644
test_finger.py File 1.95 KB 0644
test_formmethod.py File 3.56 KB 0644
test_ftp.py File 127.27 KB 0644
test_ftp_options.py File 2.62 KB 0644
test_htb.py File 3.12 KB 0644
test_ident.py File 6.85 KB 0644
test_internet.py File 45.33 KB 0644
test_iosim.py File 8.49 KB 0644
test_iutils.py File 13.13 KB 0644
test_lockfile.py File 15.14 KB 0644
test_log.py File 35.48 KB 0644
test_logfile.py File 17.8 KB 0644
test_loopback.py File 14.15 KB 0644
test_main.py File 2.44 KB 0644
test_memcache.py File 24.55 KB 0644
test_modules.py File 17.47 KB 0644
test_monkey.py File 5.5 KB 0644
test_nooldstyle.py File 5.82 KB 0644
test_paths.py File 72.61 KB 0644
test_pcp.py File 12.26 KB 0644
test_persisted.py File 14.28 KB 0644
test_plugin.py File 25.5 KB 0644
test_policies.py File 32.04 KB 0644
test_postfix.py File 3.53 KB 0644
test_process.py File 84.1 KB 0644
test_protocols.py File 7.28 KB 0644
test_randbytes.py File 3.28 KB 0644
test_rebuild.py File 8.3 KB 0644
test_reflect.py File 25.47 KB 0644
test_roots.py File 1.77 KB 0644
test_shortcut.py File 1.89 KB 0644
test_sip.py File 24.69 KB 0644
test_sob.py File 5.5 KB 0644
test_socks.py File 17.32 KB 0644
test_ssl.py File 23.29 KB 0644
test_sslverify.py File 104.28 KB 0644
test_stateful.py File 1.97 KB 0644
test_stdio.py File 12.85 KB 0644
test_strerror.py File 5.06 KB 0644
test_stringtransport.py File 12.95 KB 0644
test_strports.py File 1.75 KB 0644
test_task.py File 38.4 KB 0644
test_tcp.py File 64.07 KB 0644
test_tcp_internals.py File 8.54 KB 0644
test_text.py File 6.3 KB 0644
test_threadable.py File 3.65 KB 0644
test_threadpool.py File 22.47 KB 0644
test_threads.py File 12.96 KB 0644
test_tpfile.py File 1.56 KB 0644
test_twistd.py File 61.05 KB 0644
test_twisted.py File 18.42 KB 0644
test_udp.py File 24.1 KB 0644
test_unix.py File 14.8 KB 0644
test_usage.py File 23.09 KB 0644
testutils.py File 5.19 KB 0644