404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.14.133.188: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.internet.stdio}.

@var properEnv: A copy of L{os.environ} which has L{bytes} keys/values on POSIX
    platforms and native L{str} keys/values on Windows.
"""

from __future__ import absolute_import, division

import os
import sys
import itertools

from twisted.trial import unittest
from twisted.python import filepath, log
from twisted.python.reflect import requireModule
from twisted.python.runtime import platform
from twisted.python.compat import range, intToBytes, bytesEnviron
from twisted.internet import error, defer, protocol, stdio, reactor
from twisted.test.test_tcp import ConnectionLostNotifyingProtocol


# A short string which is intended to appear here and nowhere else,
# particularly not in any random garbage output CPython unavoidable
# generates (such as in warning text and so forth).  This is searched
# for in the output from stdio_test_lastwrite and if it is found at
# the end, the functionality works.
UNIQUE_LAST_WRITE_STRING = b'xyz123abc Twisted is great!'

skipWindowsNopywin32 = None
if platform.isWindows():
    if requireModule('win32process') is None:
        skipWindowsNopywin32 = ("On windows, spawnProcess is not available "
                                "in the absence of win32process.")
    properEnv = dict(os.environ)
    properEnv["PYTHONPATH"] = os.pathsep.join(sys.path)
else:
    properEnv = bytesEnviron()
    properEnv[b"PYTHONPATH"] = os.pathsep.join(sys.path).encode(
        sys.getfilesystemencoding())


class StandardIOTestProcessProtocol(protocol.ProcessProtocol):
    """
    Test helper for collecting output from a child process and notifying
    something when it exits.

    @ivar onConnection: A L{defer.Deferred} which will be called back with
    L{None} when the connection to the child process is established.

    @ivar onCompletion: A L{defer.Deferred} which will be errbacked with the
    failure associated with the child process exiting when it exits.

    @ivar onDataReceived: A L{defer.Deferred} which will be called back with
    this instance whenever C{childDataReceived} is called, or L{None} to
    suppress these callbacks.

    @ivar data: A C{dict} mapping file descriptors to strings containing all
    bytes received from the child process on each file descriptor.
    """
    onDataReceived = None

    def __init__(self):
        self.onConnection = defer.Deferred()
        self.onCompletion = defer.Deferred()
        self.data = {}


    def connectionMade(self):
        self.onConnection.callback(None)


    def childDataReceived(self, name, bytes):
        """
        Record all bytes received from the child process in the C{data}
        dictionary.  Fire C{onDataReceived} if it is not L{None}.
        """
        self.data[name] = self.data.get(name, b'') + bytes
        if self.onDataReceived is not None:
            d, self.onDataReceived = self.onDataReceived, None
            d.callback(self)


    def processEnded(self, reason):
        self.onCompletion.callback(reason)



class StandardInputOutputTests(unittest.TestCase):

    skip = skipWindowsNopywin32

    def _spawnProcess(self, proto, sibling, *args, **kw):
        """
        Launch a child Python process and communicate with it using the
        given ProcessProtocol.

        @param proto: A L{ProcessProtocol} instance which will be connected
        to the child process.

        @param sibling: The basename of a file containing the Python program
        to run in the child process.

        @param *args: strings which will be passed to the child process on
        the command line as C{argv[2:]}.

        @param **kw: additional arguments to pass to L{reactor.spawnProcess}.

        @return: The L{IProcessTransport} provider for the spawned process.
        """
        args = [sys.executable,
                b"-m", b"twisted.test." + sibling,
                reactor.__class__.__module__] + list(args)
        return reactor.spawnProcess(
            proto,
            sys.executable,
            args,
            env=properEnv,
            **kw)


    def _requireFailure(self, d, callback):
        def cb(result):
            self.fail("Process terminated with non-Failure: %r" % (result,))
        def eb(err):
            return callback(err)
        return d.addCallbacks(cb, eb)


    def test_loseConnection(self):
        """
        Verify that a protocol connected to L{StandardIO} can disconnect
        itself using C{transport.loseConnection}.
        """
        errorLogFile = self.mktemp()
        log.msg("Child process logging to " + errorLogFile)
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion
        self._spawnProcess(p, b'stdio_test_loseconn', errorLogFile)

        def processEnded(reason):
            # Copy the child's log to ours so it's more visible.
            with open(errorLogFile, 'r') as f:
                for line in f:
                    log.msg("Child logged: " + line.rstrip())

            self.failIfIn(1, p.data)
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def test_readConnectionLost(self):
        """
        When stdin is closed and the protocol connected to it implements
        L{IHalfCloseableProtocol}, the protocol's C{readConnectionLost} method
        is called.
        """
        errorLogFile = self.mktemp()
        log.msg("Child process logging to " + errorLogFile)
        p = StandardIOTestProcessProtocol()
        p.onDataReceived = defer.Deferred()

        def cbBytes(ignored):
            d = p.onCompletion
            p.transport.closeStdin()
            return d
        p.onDataReceived.addCallback(cbBytes)

        def processEnded(reason):
            reason.trap(error.ProcessDone)
        d = self._requireFailure(p.onDataReceived, processEnded)

        self._spawnProcess(
            p, b'stdio_test_halfclose', errorLogFile)
        return d


    def test_lastWriteReceived(self):
        """
        Verify that a write made directly to stdout using L{os.write}
        after StandardIO has finished is reliably received by the
        process reading that stdout.
        """
        p = StandardIOTestProcessProtocol()

        # Note: the OS X bug which prompted the addition of this test
        # is an apparent race condition involving non-blocking PTYs.
        # Delaying the parent process significantly increases the
        # likelihood of the race going the wrong way.  If you need to
        # fiddle with this code at all, uncommenting the next line
        # will likely make your life much easier.  It is commented out
        # because it makes the test quite slow.

        # p.onConnection.addCallback(lambda ign: __import__('time').sleep(5))

        try:
            self._spawnProcess(
                p, b'stdio_test_lastwrite', UNIQUE_LAST_WRITE_STRING,
                usePTY=True)
        except ValueError as e:
            # Some platforms don't work with usePTY=True
            raise unittest.SkipTest(str(e))

        def processEnded(reason):
            """
            Asserts that the parent received the bytes written by the child
            immediately after the child starts.
            """
            self.assertTrue(
                p.data[1].endswith(UNIQUE_LAST_WRITE_STRING),
                "Received %r from child, did not find expected bytes." % (
                    p.data,))
            reason.trap(error.ProcessDone)
        return self._requireFailure(p.onCompletion, processEnded)


    def test_hostAndPeer(self):
        """
        Verify that the transport of a protocol connected to L{StandardIO}
        has C{getHost} and C{getPeer} methods.
        """
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion
        self._spawnProcess(p, b'stdio_test_hostpeer')

        def processEnded(reason):
            host, peer = p.data[1].splitlines()
            self.assertTrue(host)
            self.assertTrue(peer)
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def test_write(self):
        """
        Verify that the C{write} method of the transport of a protocol
        connected to L{StandardIO} sends bytes to standard out.
        """
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion

        self._spawnProcess(p, b'stdio_test_write')

        def processEnded(reason):
            self.assertEqual(p.data[1], b'ok!')
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def test_writeSequence(self):
        """
        Verify that the C{writeSequence} method of the transport of a
        protocol connected to L{StandardIO} sends bytes to standard out.
        """
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion

        self._spawnProcess(p, b'stdio_test_writeseq')

        def processEnded(reason):
            self.assertEqual(p.data[1], b'ok!')
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def _junkPath(self):
        junkPath = self.mktemp()
        with open(junkPath, 'wb') as junkFile:
            for i in range(1024):
                junkFile.write(intToBytes(i) + b'\n')
        return junkPath


    def test_producer(self):
        """
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IProducer} provider.
        """
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion

        written = []
        toWrite = list(range(100))

        def connectionMade(ign):
            if toWrite:
                written.append(intToBytes(toWrite.pop()) + b"\n")
                proc.write(written[-1])
                reactor.callLater(0.01, connectionMade, None)

        proc = self._spawnProcess(p, b'stdio_test_producer')

        p.onConnection.addCallback(connectionMade)

        def processEnded(reason):
            self.assertEqual(p.data[1], b''.join(written))
            self.assertFalse(
                toWrite,
                "Connection lost with %d writes left to go." % (len(toWrite),))
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def test_consumer(self):
        """
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IConsumer} provider.
        """
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion

        junkPath = self._junkPath()

        self._spawnProcess(p, b'stdio_test_consumer', junkPath)

        def processEnded(reason):
            with open(junkPath, 'rb') as f:
                self.assertEqual(p.data[1], f.read())
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def test_normalFileStandardOut(self):
        """
        If L{StandardIO} is created with a file descriptor which refers to a
        normal file (ie, a file from the filesystem), L{StandardIO.write}
        writes bytes to that file.  In particular, it does not immediately
        consider the file closed or call its protocol's C{connectionLost}
        method.
        """
        onConnLost = defer.Deferred()
        proto = ConnectionLostNotifyingProtocol(onConnLost)
        path = filepath.FilePath(self.mktemp())
        self.normal = normal = path.open('wb')
        self.addCleanup(normal.close)

        kwargs = dict(stdout=normal.fileno())
        if not platform.isWindows():
            # Make a fake stdin so that StandardIO doesn't mess with the *real*
            # stdin.
            r, w = os.pipe()
            self.addCleanup(os.close, r)
            self.addCleanup(os.close, w)
            kwargs['stdin'] = r
        connection = stdio.StandardIO(proto, **kwargs)

        # The reactor needs to spin a bit before it might have incorrectly
        # decided stdout is closed.  Use this counter to keep track of how
        # much we've let it spin.  If it closes before we expected, this
        # counter will have a value that's too small and we'll know.
        howMany = 5
        count = itertools.count()

        def spin():
            for value in count:
                if value == howMany:
                    connection.loseConnection()
                    return
                connection.write(intToBytes(value))
                break
            reactor.callLater(0, spin)
        reactor.callLater(0, spin)

        # Once the connection is lost, make sure the counter is at the
        # appropriate value.
        def cbLost(reason):
            self.assertEqual(next(count), howMany + 1)
            self.assertEqual(
                path.getContent(),
                b''.join(map(intToBytes, range(howMany))))
        onConnLost.addCallback(cbLost)
        return onConnLost

    if platform.isWindows():
        test_normalFileStandardOut.skip = (
            "StandardIO does not accept stdout as an argument to Windows.  "
            "Testing redirection to a file is therefore harder.")

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 103 B 0644
cert.pem.no_trailing_newline File 1.38 KB 0644
crash_test_dummy.py File 543 B 0644
iosim.py File 17.3 KB 0644
key.pem.no_trailing_newline File 1.67 KB 0644
mock_win32process.py File 1.46 KB 0644
myrebuilder1.py File 158 B 0644
myrebuilder2.py File 158 B 0644
plugin_basic.py File 943 B 0644
plugin_extra1.py File 407 B 0644
plugin_extra2.py File 579 B 0644
process_cmdline.py File 162 B 0644
process_echoer.py File 214 B 0644
process_fds.py File 945 B 0644
process_getargv.py File 283 B 0644
process_getenv.py File 268 B 0644
process_linger.py File 286 B 0644
process_reader.py File 188 B 0644
process_signal.py File 214 B 0644
process_stdinreader.py File 857 B 0644
process_tester.py File 1.01 KB 0644
process_tty.py File 130 B 0644
process_twisted.py File 1.18 KB 0644
proto_helpers.py File 26.33 KB 0644
raiser.c File 93.05 KB 0644
raiser.cpython-36m-x86_64-linux-gnu.so File 19.16 KB 0644
raiser.pyx File 466 B 0644
reflect_helper_IE.py File 61 B 0644
reflect_helper_VE.py File 82 B 0644
reflect_helper_ZDE.py File 47 B 0644
server.pem File 4.34 KB 0644
ssl_helpers.py File 1.01 KB 0644
stdio_test_consumer.py File 1.19 KB 0644
stdio_test_halfclose.py File 1.89 KB 0644
stdio_test_hostpeer.py File 1021 B 0644
stdio_test_lastwrite.py File 1.18 KB 0644
stdio_test_loseconn.py File 1.51 KB 0644
stdio_test_producer.py File 1.47 KB 0644
stdio_test_write.py File 923 B 0644
stdio_test_writeseq.py File 915 B 0644
test_abstract.py File 3.42 KB 0644
test_adbapi.py File 25.53 KB 0644
test_amp.py File 107.96 KB 0644
test_application.py File 32.05 KB 0644
test_compat.py File 27.32 KB 0644
test_context.py File 1.48 KB 0644
test_cooperator.py File 20.96 KB 0644
test_defer.py File 100.93 KB 0644
test_defgen.py File 10.45 KB 0644
test_dict.py File 1.41 KB 0644
test_dirdbm.py File 6.76 KB 0644
test_error.py File 8.39 KB 0644
test_factories.py File 4.53 KB 0644
test_failure.py File 29.92 KB 0644
test_fdesc.py File 7.2 KB 0644
test_finger.py File 1.95 KB 0644
test_formmethod.py File 3.56 KB 0644
test_ftp.py File 127.27 KB 0644
test_ftp_options.py File 2.62 KB 0644
test_htb.py File 3.12 KB 0644
test_ident.py File 6.85 KB 0644
test_internet.py File 45.33 KB 0644
test_iosim.py File 8.49 KB 0644
test_iutils.py File 13.13 KB 0644
test_lockfile.py File 15.14 KB 0644
test_log.py File 35.48 KB 0644
test_logfile.py File 17.8 KB 0644
test_loopback.py File 14.15 KB 0644
test_main.py File 2.44 KB 0644
test_memcache.py File 24.55 KB 0644
test_modules.py File 17.47 KB 0644
test_monkey.py File 5.5 KB 0644
test_nooldstyle.py File 5.82 KB 0644
test_paths.py File 72.61 KB 0644
test_pcp.py File 12.26 KB 0644
test_persisted.py File 14.28 KB 0644
test_plugin.py File 25.5 KB 0644
test_policies.py File 32.04 KB 0644
test_postfix.py File 3.53 KB 0644
test_process.py File 84.1 KB 0644
test_protocols.py File 7.28 KB 0644
test_randbytes.py File 3.28 KB 0644
test_rebuild.py File 8.3 KB 0644
test_reflect.py File 25.47 KB 0644
test_roots.py File 1.77 KB 0644
test_shortcut.py File 1.89 KB 0644
test_sip.py File 24.69 KB 0644
test_sob.py File 5.5 KB 0644
test_socks.py File 17.32 KB 0644
test_ssl.py File 23.29 KB 0644
test_sslverify.py File 104.28 KB 0644
test_stateful.py File 1.97 KB 0644
test_stdio.py File 12.85 KB 0644
test_strerror.py File 5.06 KB 0644
test_stringtransport.py File 12.95 KB 0644
test_strports.py File 1.75 KB 0644
test_task.py File 38.4 KB 0644
test_tcp.py File 64.07 KB 0644
test_tcp_internals.py File 8.54 KB 0644
test_text.py File 6.3 KB 0644
test_threadable.py File 3.65 KB 0644
test_threadpool.py File 22.47 KB 0644
test_threads.py File 12.96 KB 0644
test_tpfile.py File 1.56 KB 0644
test_twistd.py File 61.05 KB 0644
test_twisted.py File 18.42 KB 0644
test_udp.py File 24.1 KB 0644
test_unix.py File 14.8 KB 0644
test_usage.py File 23.09 KB 0644
testutils.py File 5.19 KB 0644