404

[ Avaa Bypassed ]




Upload:

Command:

botdev@18.191.174.179: ~ $
# -*- test-case-name: twisted.words.test.test_jabbersaslmechanisms -*-
#
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Protocol agnostic implementations of SASL authentication mechanisms.
"""

from __future__ import absolute_import, division

import binascii, random, time, os
from hashlib import md5

from zope.interface import Interface, Attribute, implementer

from twisted.python.compat import iteritems, networkString


class ISASLMechanism(Interface):
    name = Attribute("""Common name for the SASL Mechanism.""")

    def getInitialResponse():
        """
        Get the initial client response, if defined for this mechanism.

        @return: initial client response string.
        @rtype: C{str}.
        """


    def getResponse(challenge):
        """
        Get the response to a server challenge.

        @param challenge: server challenge.
        @type challenge: C{str}.
        @return: client response.
        @rtype: C{str}.
        """



@implementer(ISASLMechanism)
class Anonymous(object):
    """
    Implements the ANONYMOUS SASL authentication mechanism.

    This mechanism is defined in RFC 2245.
    """
    name = 'ANONYMOUS'

    def getInitialResponse(self):
        return None



@implementer(ISASLMechanism)
class Plain(object):
    """
    Implements the PLAIN SASL authentication mechanism.

    The PLAIN SASL authentication mechanism is defined in RFC 2595.
    """
    name = 'PLAIN'

    def __init__(self, authzid, authcid, password):
        """
        @param authzid: The authorization identity.
        @type authzid: L{unicode}

        @param authcid: The authentication identity.
        @type authcid: L{unicode}

        @param password: The plain-text password.
        @type password: L{unicode}
        """

        self.authzid = authzid or u''
        self.authcid = authcid or u''
        self.password = password or u''


    def getInitialResponse(self):
        return (self.authzid.encode('utf-8') + b"\x00" +
                self.authcid.encode('utf-8') + b"\x00" +
                self.password.encode('utf-8'))



@implementer(ISASLMechanism)
class DigestMD5(object):
    """
    Implements the DIGEST-MD5 SASL authentication mechanism.

    The DIGEST-MD5 SASL authentication mechanism is defined in RFC 2831.
    """
    name = 'DIGEST-MD5'

    def __init__(self, serv_type, host, serv_name, username, password):
        """
        @param serv_type: An indication of what kind of server authentication
            is being attempted against.  For example, C{u"xmpp"}.
        @type serv_type: C{unicode}

        @param host: The authentication hostname.  Also known as the realm.
            This is used as a scope to help select the right credentials.
        @type host: C{unicode}

        @param serv_name: An additional identifier for the server.
        @type serv_name: C{unicode}

        @param username: The authentication username to use to respond to a
            challenge.
        @type username: C{unicode}

        @param username: The authentication password to use to respond to a
            challenge.
        @type password: C{unicode}
        """
        self.username = username
        self.password = password
        self.defaultRealm = host

        self.digest_uri = u'%s/%s' % (serv_type, host)
        if serv_name is not None:
            self.digest_uri += u'/%s' % (serv_name,)


    def getInitialResponse(self):
        return None


    def getResponse(self, challenge):
        directives = self._parse(challenge)

        # Compat for implementations that do not send this along with
        # a successful authentication.
        if b'rspauth' in directives:
            return b''

        charset = directives[b'charset'].decode('ascii')

        try:
            realm = directives[b'realm']
        except KeyError:
            realm = self.defaultRealm.encode(charset)

        return self._genResponse(charset,
                                 realm,
                                 directives[b'nonce'])


    def _parse(self, challenge):
        """
        Parses the server challenge.

        Splits the challenge into a dictionary of directives with values.

        @return: challenge directives and their values.
        @rtype: C{dict} of C{str} to C{str}.
        """
        s = challenge
        paramDict = {}
        cur = 0
        remainingParams = True
        while remainingParams:
            # Parse a param. We can't just split on commas, because there can
            # be some commas inside (quoted) param values, e.g.:
            # qop="auth,auth-int"

            middle = s.index(b"=", cur)
            name = s[cur:middle].lstrip()
            middle += 1
            if s[middle:middle+1] == b'"':
                middle += 1
                end = s.index(b'"', middle)
                value = s[middle:end]
                cur = s.find(b',', end) + 1
                if cur == 0:
                    remainingParams = False
            else:
                end = s.find(b',', middle)
                if end == -1:
                    value = s[middle:].rstrip()
                    remainingParams = False
                else:
                    value = s[middle:end].rstrip()
                cur = end + 1
            paramDict[name] = value

        for param in (b'qop', b'cipher'):
            if param in paramDict:
                paramDict[param] = paramDict[param].split(b',')

        return paramDict

    def _unparse(self, directives):
        """
        Create message string from directives.

        @param directives: dictionary of directives (names to their values).
                           For certain directives, extra quotes are added, as
                           needed.
        @type directives: C{dict} of C{str} to C{str}
        @return: message string.
        @rtype: C{str}.
        """

        directive_list = []
        for name, value in iteritems(directives):
            if name in (b'username', b'realm', b'cnonce',
                        b'nonce', b'digest-uri', b'authzid', b'cipher'):
                directive = name + b'=' + value
            else:
                directive = name + b'=' + value

            directive_list.append(directive)

        return b','.join(directive_list)


    def _calculateResponse(self, cnonce, nc, nonce,
                            username, password, realm, uri):
        """
        Calculates response with given encoded parameters.

        @return: The I{response} field of a response to a Digest-MD5 challenge
            of the given parameters.
        @rtype: L{bytes}
        """
        def H(s):
            return md5(s).digest()

        def HEX(n):
            return binascii.b2a_hex(n)

        def KD(k, s):
            return H(k + b':' + s)

        a1 = (H(username + b":" + realm + b":" + password) + b":" +
              nonce + b":" +
              cnonce)
        a2 = b"AUTHENTICATE:" + uri

        response = HEX(KD(HEX(H(a1)),
                       nonce + b":" + nc + b":" + cnonce + b":" +
                       b"auth" + b":" + HEX(H(a2))))
        return response


    def _genResponse(self, charset, realm, nonce):
        """
        Generate response-value.

        Creates a response to a challenge according to section 2.1.2.1 of
        RFC 2831 using the C{charset}, C{realm} and C{nonce} directives
        from the challenge.
        """
        try:
            username = self.username.encode(charset)
            password = self.password.encode(charset)
            digest_uri = self.digest_uri.encode(charset)
        except UnicodeError:
            # TODO - add error checking
            raise

        nc = networkString('%08x' % (1,)) # TODO: support subsequent auth.
        cnonce = self._gen_nonce()
        qop = b'auth'

        # TODO - add support for authzid
        response = self._calculateResponse(cnonce, nc, nonce,
                                           username, password, realm,
                                           digest_uri)

        directives = {b'username': username,
                      b'realm' : realm,
                      b'nonce' : nonce,
                      b'cnonce' : cnonce,
                      b'nc' : nc,
                      b'qop' : qop,
                      b'digest-uri': digest_uri,
                      b'response': response,
                      b'charset': charset.encode('ascii')}

        return self._unparse(directives)


    def _gen_nonce(self):
        nonceString = "%f:%f:%d" % (random.random(), time.time(), os.getpid())
        nonceBytes = networkString(nonceString)
        return md5(nonceBytes).hexdigest().encode('ascii')

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 167 B 0644
client.py File 14.04 KB 0644
component.py File 15.23 KB 0644
error.py File 10.55 KB 0644
ijabber.py File 5.25 KB 0644
jid.py File 7.08 KB 0644
jstrports.py File 978 B 0644
sasl.py File 7.29 KB 0644
sasl_mechanisms.py File 8.53 KB 0644
xmlstream.py File 35.98 KB 0644
xmpp_stringprep.py File 7.05 KB 0644