404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.129.206.232: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Utilities for Twisted.names tests.
"""

from __future__ import division, absolute_import

from random import randrange

from zope.interface import implementer
from zope.interface.verify import verifyClass

from twisted.internet.address import IPv4Address
from twisted.internet.defer import succeed
from twisted.internet.task import Clock
from twisted.internet.interfaces import IReactorUDP, IUDPTransport



@implementer(IUDPTransport)
class MemoryDatagramTransport(object):
    """
    This L{IUDPTransport} implementation enforces the usual connection rules
    and captures sent traffic in a list for later inspection.

    @ivar _host: The host address to which this transport is bound.
    @ivar _protocol: The protocol connected to this transport.
    @ivar _sentPackets: A C{list} of two-tuples of the datagrams passed to
        C{write} and the addresses to which they are destined.

    @ivar _connectedTo: L{None} if this transport is unconnected, otherwise an
        address to which all traffic is supposedly sent.

    @ivar _maxPacketSize: An C{int} giving the maximum length of a datagram
        which will be successfully handled by C{write}.
    """
    def __init__(self, host, protocol, maxPacketSize):
        self._host = host
        self._protocol = protocol
        self._sentPackets = []
        self._connectedTo = None
        self._maxPacketSize = maxPacketSize


    def getHost(self):
        """
        Return the address which this transport is pretending to be bound
        to.
        """
        return IPv4Address('UDP', *self._host)


    def connect(self, host, port):
        """
        Connect this transport to the given address.
        """
        if self._connectedTo is not None:
            raise ValueError("Already connected")
        self._connectedTo = (host, port)


    def write(self, datagram, addr=None):
        """
        Send the given datagram.
        """
        if addr is None:
            addr = self._connectedTo
        if addr is None:
            raise ValueError("Need an address")
        if len(datagram) > self._maxPacketSize:
            raise ValueError("Packet too big")
        self._sentPackets.append((datagram, addr))


    def stopListening(self):
        """
        Shut down this transport.
        """
        self._protocol.stopProtocol()
        return succeed(None)


    def setBroadcastAllowed(self, enabled):
        """
        Dummy implementation to satisfy L{IUDPTransport}.
        """
        pass


    def getBroadcastAllowed(self):
        """
        Dummy implementation to satisfy L{IUDPTransport}.
        """
        pass


verifyClass(IUDPTransport, MemoryDatagramTransport)



@implementer(IReactorUDP)
class MemoryReactor(Clock):
    """
    An L{IReactorTime} and L{IReactorUDP} provider.

    Time is controlled deterministically via the base class, L{Clock}.  UDP is
    handled in-memory by connecting protocols to instances of
    L{MemoryDatagramTransport}.

    @ivar udpPorts: A C{dict} mapping port numbers to instances of
        L{MemoryDatagramTransport}.
    """
    def __init__(self):
        Clock.__init__(self)
        self.udpPorts = {}


    def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
        """
        Pretend to bind a UDP port and connect the given protocol to it.
        """
        if port == 0:
            while True:
                port = randrange(1, 2 ** 16)
                if port not in self.udpPorts:
                    break
        if port in self.udpPorts:
            raise ValueError("Address in use")
        transport = MemoryDatagramTransport(
            (interface, port), protocol, maxPacketSize)
        self.udpPorts[port] = transport
        protocol.makeConnection(transport)
        return transport

verifyClass(IReactorUDP, MemoryReactor)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 26 B 0644
test_cache.py File 4.8 KB 0644
test_client.py File 39.31 KB 0644
test_common.py File 4.11 KB 0644
test_dns.py File 150.45 KB 0644
test_examples.py File 5.22 KB 0644
test_hosts.py File 8.35 KB 0644
test_names.py File 42.09 KB 0644
test_resolve.py File 1.05 KB 0644
test_rfc1982.py File 13.66 KB 0644
test_rootresolve.py File 25.04 KB 0644
test_server.py File 40.27 KB 0644
test_srvconnect.py File 9.89 KB 0644
test_tap.py File 4.84 KB 0644
test_util.py File 3.82 KB 0644