404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.16.139.76: ~ $
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.

from __future__ import absolute_import, division, print_function

import datetime
import ipaddress

from asn1crypto.core import Integer, SequenceOf

from cryptography import x509
from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM
from cryptography.x509.name import _ASN1_TYPE_TO_ENUM
from cryptography.x509.oid import (
    CRLEntryExtensionOID, CertificatePoliciesOID, ExtensionOID
)


class _Integers(SequenceOf):
    _child_spec = Integer


def _obj2txt(backend, obj):
    # Set to 80 on the recommendation of
    # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
    #
    # But OIDs longer than this occur in real life (e.g. Active
    # Directory makes some very long OIDs).  So we need to detect
    # and properly handle the case where the default buffer is not
    # big enough.
    #
    buf_len = 80
    buf = backend._ffi.new("char[]", buf_len)

    # 'res' is the number of bytes that *would* be written if the
    # buffer is large enough.  If 'res' > buf_len - 1, we need to
    # alloc a big-enough buffer and go again.
    res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
    if res > buf_len - 1:  # account for terminating null byte
        buf_len = res + 1
        buf = backend._ffi.new("char[]", buf_len)
        res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
    backend.openssl_assert(res > 0)
    return backend._ffi.buffer(buf, res)[:].decode()


def _decode_x509_name_entry(backend, x509_name_entry):
    obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
    backend.openssl_assert(obj != backend._ffi.NULL)
    data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
    backend.openssl_assert(data != backend._ffi.NULL)
    value = _asn1_string_to_utf8(backend, data)
    oid = _obj2txt(backend, obj)
    type = _ASN1_TYPE_TO_ENUM[data.type]

    return x509.NameAttribute(x509.ObjectIdentifier(oid), value, type)


def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    prev_set_id = -1
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attribute = _decode_x509_name_entry(backend, entry)
        set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
        if set_id != prev_set_id:
            attributes.append(set([attribute]))
        else:
            # is in the same RDN a previous entry
            attributes[-1].add(attribute)
        prev_set_id = set_id

    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes)


def _decode_general_names(backend, gns):
    num = backend._lib.sk_GENERAL_NAME_num(gns)
    names = []
    for i in range(num):
        gn = backend._lib.sk_GENERAL_NAME_value(gns, i)
        backend.openssl_assert(gn != backend._ffi.NULL)
        names.append(_decode_general_name(backend, gn))

    return names


def _decode_general_name(backend, gn):
    if gn.type == backend._lib.GEN_DNS:
        # Convert to bytes and then decode to utf8. We don't use
        # asn1_string_to_utf8 here because it doesn't properly convert
        # utf8 from ia5strings.
        data = _asn1_string_to_bytes(backend, gn.d.dNSName).decode("utf8")
        # We don't use the constructor for DNSName so we can bypass validation
        # This allows us to create DNSName objects that have unicode chars
        # when a certificate (against the RFC) contains them.
        return x509.DNSName._init_without_validation(data)
    elif gn.type == backend._lib.GEN_URI:
        # Convert to bytes and then decode to utf8. We don't use
        # asn1_string_to_utf8 here because it doesn't properly convert
        # utf8 from ia5strings.
        data = _asn1_string_to_bytes(
            backend, gn.d.uniformResourceIdentifier
        ).decode("utf8")
        # We don't use the constructor for URI so we can bypass validation
        # This allows us to create URI objects that have unicode chars
        # when a certificate (against the RFC) contains them.
        return x509.UniformResourceIdentifier._init_without_validation(data)
    elif gn.type == backend._lib.GEN_RID:
        oid = _obj2txt(backend, gn.d.registeredID)
        return x509.RegisteredID(x509.ObjectIdentifier(oid))
    elif gn.type == backend._lib.GEN_IPADD:
        data = _asn1_string_to_bytes(backend, gn.d.iPAddress)
        data_len = len(data)
        if data_len == 8 or data_len == 32:
            # This is an IPv4 or IPv6 Network and not a single IP. This
            # type of data appears in Name Constraints. Unfortunately,
            # ipaddress doesn't support packed bytes + netmask. Additionally,
            # IPv6Network can only handle CIDR rather than the full 16 byte
            # netmask. To handle this we convert the netmask to integer, then
            # find the first 0 bit, which will be the prefix. If another 1
            # bit is present after that the netmask is invalid.
            base = ipaddress.ip_address(data[:data_len // 2])
            netmask = ipaddress.ip_address(data[data_len // 2:])
            bits = bin(int(netmask))[2:]
            prefix = bits.find('0')
            # If no 0 bits are found it is a /32 or /128
            if prefix == -1:
                prefix = len(bits)

            if "1" in bits[prefix:]:
                raise ValueError("Invalid netmask")

            ip = ipaddress.ip_network(base.exploded + u"/{0}".format(prefix))
        else:
            ip = ipaddress.ip_address(data)

        return x509.IPAddress(ip)
    elif gn.type == backend._lib.GEN_DIRNAME:
        return x509.DirectoryName(
            _decode_x509_name(backend, gn.d.directoryName)
        )
    elif gn.type == backend._lib.GEN_EMAIL:
        # Convert to bytes and then decode to utf8. We don't use
        # asn1_string_to_utf8 here because it doesn't properly convert
        # utf8 from ia5strings.
        data = _asn1_string_to_bytes(backend, gn.d.rfc822Name).decode("utf8")
        # We don't use the constructor for RFC822Name so we can bypass
        # validation. This allows us to create RFC822Name objects that have
        # unicode chars when a certificate (against the RFC) contains them.
        return x509.RFC822Name._init_without_validation(data)
    elif gn.type == backend._lib.GEN_OTHERNAME:
        type_id = _obj2txt(backend, gn.d.otherName.type_id)
        value = _asn1_to_der(backend, gn.d.otherName.value)
        return x509.OtherName(x509.ObjectIdentifier(type_id), value)
    else:
        # x400Address or ediPartyName
        raise x509.UnsupportedGeneralNameType(
            "{0} is not a supported type".format(
                x509._GENERAL_NAMES.get(gn.type, gn.type)
            ),
            gn.type
        )


def _decode_ocsp_no_check(backend, ext):
    return x509.OCSPNoCheck()


def _decode_crl_number(backend, ext):
    asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext)
    asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
    return x509.CRLNumber(_asn1_integer_to_int(backend, asn1_int))


def _decode_delta_crl_indicator(backend, ext):
    asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext)
    asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
    return x509.DeltaCRLIndicator(_asn1_integer_to_int(backend, asn1_int))


class _X509ExtensionParser(object):
    def __init__(self, ext_count, get_ext, handlers):
        self.ext_count = ext_count
        self.get_ext = get_ext
        self.handlers = handlers

    def parse(self, backend, x509_obj):
        extensions = []
        seen_oids = set()
        for i in range(self.ext_count(backend, x509_obj)):
            ext = self.get_ext(backend, x509_obj, i)
            backend.openssl_assert(ext != backend._ffi.NULL)
            crit = backend._lib.X509_EXTENSION_get_critical(ext)
            critical = crit == 1
            oid = x509.ObjectIdentifier(
                _obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
            )
            if oid in seen_oids:
                raise x509.DuplicateExtension(
                    "Duplicate {0} extension found".format(oid), oid
                )

            # This OID is only supported in OpenSSL 1.1.0+ but we want
            # to support it in all versions of OpenSSL so we decode it
            # ourselves.
            if oid == ExtensionOID.TLS_FEATURE:
                data = backend._lib.X509_EXTENSION_get_data(ext)
                parsed = _Integers.load(_asn1_string_to_bytes(backend, data))
                value = x509.TLSFeature(
                    [_TLS_FEATURE_TYPE_TO_ENUM[x.native] for x in parsed]
                )
                extensions.append(x509.Extension(oid, critical, value))
                seen_oids.add(oid)
                continue

            try:
                handler = self.handlers[oid]
            except KeyError:
                # Dump the DER payload into an UnrecognizedExtension object
                data = backend._lib.X509_EXTENSION_get_data(ext)
                backend.openssl_assert(data != backend._ffi.NULL)
                der = backend._ffi.buffer(data.data, data.length)[:]
                unrecognized = x509.UnrecognizedExtension(oid, der)
                extensions.append(
                    x509.Extension(oid, critical, unrecognized)
                )
            else:
                ext_data = backend._lib.X509V3_EXT_d2i(ext)
                if ext_data == backend._ffi.NULL:
                    backend._consume_errors()
                    raise ValueError(
                        "The {0} extension is invalid and can't be "
                        "parsed".format(oid)
                    )

                value = handler(backend, ext_data)
                extensions.append(x509.Extension(oid, critical, value))

            seen_oids.add(oid)

        return x509.Extensions(extensions)


def _decode_certificate_policies(backend, cp):
    cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp)
    cp = backend._ffi.gc(cp, backend._lib.CERTIFICATEPOLICIES_free)

    num = backend._lib.sk_POLICYINFO_num(cp)
    certificate_policies = []
    for i in range(num):
        qualifiers = None
        pi = backend._lib.sk_POLICYINFO_value(cp, i)
        oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid))
        if pi.qualifiers != backend._ffi.NULL:
            qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers)
            qualifiers = []
            for j in range(qnum):
                pqi = backend._lib.sk_POLICYQUALINFO_value(
                    pi.qualifiers, j
                )
                pqualid = x509.ObjectIdentifier(
                    _obj2txt(backend, pqi.pqualid)
                )
                if pqualid == CertificatePoliciesOID.CPS_QUALIFIER:
                    cpsuri = backend._ffi.buffer(
                        pqi.d.cpsuri.data, pqi.d.cpsuri.length
                    )[:].decode('ascii')
                    qualifiers.append(cpsuri)
                else:
                    assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE
                    user_notice = _decode_user_notice(
                        backend, pqi.d.usernotice
                    )
                    qualifiers.append(user_notice)

        certificate_policies.append(
            x509.PolicyInformation(oid, qualifiers)
        )

    return x509.CertificatePolicies(certificate_policies)


def _decode_user_notice(backend, un):
    explicit_text = None
    notice_reference = None

    if un.exptext != backend._ffi.NULL:
        explicit_text = _asn1_string_to_utf8(backend, un.exptext)

    if un.noticeref != backend._ffi.NULL:
        organization = _asn1_string_to_utf8(
            backend, un.noticeref.organization
        )

        num = backend._lib.sk_ASN1_INTEGER_num(
            un.noticeref.noticenos
        )
        notice_numbers = []
        for i in range(num):
            asn1_int = backend._lib.sk_ASN1_INTEGER_value(
                un.noticeref.noticenos, i
            )
            notice_num = _asn1_integer_to_int(backend, asn1_int)
            notice_numbers.append(notice_num)

        notice_reference = x509.NoticeReference(
            organization, notice_numbers
        )

    return x509.UserNotice(notice_reference, explicit_text)


def _decode_basic_constraints(backend, bc_st):
    basic_constraints = backend._ffi.cast("BASIC_CONSTRAINTS *", bc_st)
    basic_constraints = backend._ffi.gc(
        basic_constraints, backend._lib.BASIC_CONSTRAINTS_free
    )
    # The byte representation of an ASN.1 boolean true is \xff. OpenSSL
    # chooses to just map this to its ordinal value, so true is 255 and
    # false is 0.
    ca = basic_constraints.ca == 255
    path_length = _asn1_integer_to_int_or_none(
        backend, basic_constraints.pathlen
    )

    return x509.BasicConstraints(ca, path_length)


def _decode_subject_key_identifier(backend, asn1_string):
    asn1_string = backend._ffi.cast("ASN1_OCTET_STRING *", asn1_string)
    asn1_string = backend._ffi.gc(
        asn1_string, backend._lib.ASN1_OCTET_STRING_free
    )
    return x509.SubjectKeyIdentifier(
        backend._ffi.buffer(asn1_string.data, asn1_string.length)[:]
    )


def _decode_authority_key_identifier(backend, akid):
    akid = backend._ffi.cast("AUTHORITY_KEYID *", akid)
    akid = backend._ffi.gc(akid, backend._lib.AUTHORITY_KEYID_free)
    key_identifier = None
    authority_cert_issuer = None

    if akid.keyid != backend._ffi.NULL:
        key_identifier = backend._ffi.buffer(
            akid.keyid.data, akid.keyid.length
        )[:]

    if akid.issuer != backend._ffi.NULL:
        authority_cert_issuer = _decode_general_names(
            backend, akid.issuer
        )

    authority_cert_serial_number = _asn1_integer_to_int_or_none(
        backend, akid.serial
    )

    return x509.AuthorityKeyIdentifier(
        key_identifier, authority_cert_issuer, authority_cert_serial_number
    )


def _decode_authority_information_access(backend, aia):
    aia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", aia)
    aia = backend._ffi.gc(aia, backend._lib.sk_ACCESS_DESCRIPTION_free)
    num = backend._lib.sk_ACCESS_DESCRIPTION_num(aia)
    access_descriptions = []
    for i in range(num):
        ad = backend._lib.sk_ACCESS_DESCRIPTION_value(aia, i)
        backend.openssl_assert(ad.method != backend._ffi.NULL)
        oid = x509.ObjectIdentifier(_obj2txt(backend, ad.method))
        backend.openssl_assert(ad.location != backend._ffi.NULL)
        gn = _decode_general_name(backend, ad.location)
        access_descriptions.append(x509.AccessDescription(oid, gn))

    return x509.AuthorityInformationAccess(access_descriptions)


def _decode_key_usage(backend, bit_string):
    bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
    bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
    get_bit = backend._lib.ASN1_BIT_STRING_get_bit
    digital_signature = get_bit(bit_string, 0) == 1
    content_commitment = get_bit(bit_string, 1) == 1
    key_encipherment = get_bit(bit_string, 2) == 1
    data_encipherment = get_bit(bit_string, 3) == 1
    key_agreement = get_bit(bit_string, 4) == 1
    key_cert_sign = get_bit(bit_string, 5) == 1
    crl_sign = get_bit(bit_string, 6) == 1
    encipher_only = get_bit(bit_string, 7) == 1
    decipher_only = get_bit(bit_string, 8) == 1
    return x509.KeyUsage(
        digital_signature,
        content_commitment,
        key_encipherment,
        data_encipherment,
        key_agreement,
        key_cert_sign,
        crl_sign,
        encipher_only,
        decipher_only
    )


def _decode_general_names_extension(backend, gns):
    gns = backend._ffi.cast("GENERAL_NAMES *", gns)
    gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free)
    general_names = _decode_general_names(backend, gns)
    return general_names


def _decode_subject_alt_name(backend, ext):
    return x509.SubjectAlternativeName(
        _decode_general_names_extension(backend, ext)
    )


def _decode_issuer_alt_name(backend, ext):
    return x509.IssuerAlternativeName(
        _decode_general_names_extension(backend, ext)
    )


def _decode_name_constraints(backend, nc):
    nc = backend._ffi.cast("NAME_CONSTRAINTS *", nc)
    nc = backend._ffi.gc(nc, backend._lib.NAME_CONSTRAINTS_free)
    permitted = _decode_general_subtrees(backend, nc.permittedSubtrees)
    excluded = _decode_general_subtrees(backend, nc.excludedSubtrees)
    return x509.NameConstraints(
        permitted_subtrees=permitted, excluded_subtrees=excluded
    )


def _decode_general_subtrees(backend, stack_subtrees):
    if stack_subtrees == backend._ffi.NULL:
        return None

    num = backend._lib.sk_GENERAL_SUBTREE_num(stack_subtrees)
    subtrees = []

    for i in range(num):
        obj = backend._lib.sk_GENERAL_SUBTREE_value(stack_subtrees, i)
        backend.openssl_assert(obj != backend._ffi.NULL)
        name = _decode_general_name(backend, obj.base)
        subtrees.append(name)

    return subtrees


def _decode_policy_constraints(backend, pc):
    pc = backend._ffi.cast("POLICY_CONSTRAINTS *", pc)
    pc = backend._ffi.gc(pc, backend._lib.POLICY_CONSTRAINTS_free)

    require_explicit_policy = _asn1_integer_to_int_or_none(
        backend, pc.requireExplicitPolicy
    )
    inhibit_policy_mapping = _asn1_integer_to_int_or_none(
        backend, pc.inhibitPolicyMapping
    )

    return x509.PolicyConstraints(
        require_explicit_policy, inhibit_policy_mapping
    )


def _decode_extended_key_usage(backend, sk):
    sk = backend._ffi.cast("Cryptography_STACK_OF_ASN1_OBJECT *", sk)
    sk = backend._ffi.gc(sk, backend._lib.sk_ASN1_OBJECT_free)
    num = backend._lib.sk_ASN1_OBJECT_num(sk)
    ekus = []

    for i in range(num):
        obj = backend._lib.sk_ASN1_OBJECT_value(sk, i)
        backend.openssl_assert(obj != backend._ffi.NULL)
        oid = x509.ObjectIdentifier(_obj2txt(backend, obj))
        ekus.append(oid)

    return x509.ExtendedKeyUsage(ekus)


_DISTPOINT_TYPE_FULLNAME = 0
_DISTPOINT_TYPE_RELATIVENAME = 1


def _decode_dist_points(backend, cdps):
    cdps = backend._ffi.cast("Cryptography_STACK_OF_DIST_POINT *", cdps)
    cdps = backend._ffi.gc(cdps, backend._lib.CRL_DIST_POINTS_free)

    num = backend._lib.sk_DIST_POINT_num(cdps)
    dist_points = []
    for i in range(num):
        full_name = None
        relative_name = None
        crl_issuer = None
        reasons = None
        cdp = backend._lib.sk_DIST_POINT_value(cdps, i)
        if cdp.reasons != backend._ffi.NULL:
            # We will check each bit from RFC 5280
            # ReasonFlags ::= BIT STRING {
            #      unused                  (0),
            #      keyCompromise           (1),
            #      cACompromise            (2),
            #      affiliationChanged      (3),
            #      superseded              (4),
            #      cessationOfOperation    (5),
            #      certificateHold         (6),
            #      privilegeWithdrawn      (7),
            #      aACompromise            (8) }
            reasons = []
            get_bit = backend._lib.ASN1_BIT_STRING_get_bit
            if get_bit(cdp.reasons, 1):
                reasons.append(x509.ReasonFlags.key_compromise)

            if get_bit(cdp.reasons, 2):
                reasons.append(x509.ReasonFlags.ca_compromise)

            if get_bit(cdp.reasons, 3):
                reasons.append(x509.ReasonFlags.affiliation_changed)

            if get_bit(cdp.reasons, 4):
                reasons.append(x509.ReasonFlags.superseded)

            if get_bit(cdp.reasons, 5):
                reasons.append(x509.ReasonFlags.cessation_of_operation)

            if get_bit(cdp.reasons, 6):
                reasons.append(x509.ReasonFlags.certificate_hold)

            if get_bit(cdp.reasons, 7):
                reasons.append(x509.ReasonFlags.privilege_withdrawn)

            if get_bit(cdp.reasons, 8):
                reasons.append(x509.ReasonFlags.aa_compromise)

            reasons = frozenset(reasons)

        if cdp.CRLissuer != backend._ffi.NULL:
            crl_issuer = _decode_general_names(backend, cdp.CRLissuer)

        # Certificates may have a crl_issuer/reasons and no distribution
        # point so make sure it's not null.
        if cdp.distpoint != backend._ffi.NULL:
            # Type 0 is fullName, there is no #define for it in the code.
            if cdp.distpoint.type == _DISTPOINT_TYPE_FULLNAME:
                full_name = _decode_general_names(
                    backend, cdp.distpoint.name.fullname
                )
            # OpenSSL code doesn't test for a specific type for
            # relativename, everything that isn't fullname is considered
            # relativename.  Per RFC 5280:
            #
            # DistributionPointName ::= CHOICE {
            #      fullName                [0]      GeneralNames,
            #      nameRelativeToCRLIssuer [1]      RelativeDistinguishedName }
            else:
                rns = cdp.distpoint.name.relativename
                rnum = backend._lib.sk_X509_NAME_ENTRY_num(rns)
                attributes = set()
                for i in range(rnum):
                    rn = backend._lib.sk_X509_NAME_ENTRY_value(
                        rns, i
                    )
                    backend.openssl_assert(rn != backend._ffi.NULL)
                    attributes.add(
                        _decode_x509_name_entry(backend, rn)
                    )

                relative_name = x509.RelativeDistinguishedName(attributes)

        dist_points.append(
            x509.DistributionPoint(
                full_name, relative_name, reasons, crl_issuer
            )
        )

    return dist_points


def _decode_crl_distribution_points(backend, cdps):
    dist_points = _decode_dist_points(backend, cdps)
    return x509.CRLDistributionPoints(dist_points)


def _decode_freshest_crl(backend, cdps):
    dist_points = _decode_dist_points(backend, cdps)
    return x509.FreshestCRL(dist_points)


def _decode_inhibit_any_policy(backend, asn1_int):
    asn1_int = backend._ffi.cast("ASN1_INTEGER *", asn1_int)
    asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
    skip_certs = _asn1_integer_to_int(backend, asn1_int)
    return x509.InhibitAnyPolicy(skip_certs)


def _decode_precert_signed_certificate_timestamps(backend, asn1_scts):
    from cryptography.hazmat.backends.openssl.x509 import (
        _SignedCertificateTimestamp
    )
    asn1_scts = backend._ffi.cast("Cryptography_STACK_OF_SCT *", asn1_scts)
    asn1_scts = backend._ffi.gc(asn1_scts, backend._lib.SCT_LIST_free)

    scts = []
    for i in range(backend._lib.sk_SCT_num(asn1_scts)):
        sct = backend._lib.sk_SCT_value(asn1_scts, i)

        scts.append(_SignedCertificateTimestamp(backend, asn1_scts, sct))
    return x509.PrecertificateSignedCertificateTimestamps(scts)


#    CRLReason ::= ENUMERATED {
#        unspecified             (0),
#        keyCompromise           (1),
#        cACompromise            (2),
#        affiliationChanged      (3),
#        superseded              (4),
#        cessationOfOperation    (5),
#        certificateHold         (6),
#             -- value 7 is not used
#        removeFromCRL           (8),
#        privilegeWithdrawn      (9),
#        aACompromise           (10) }
_CRL_ENTRY_REASON_CODE_TO_ENUM = {
    0: x509.ReasonFlags.unspecified,
    1: x509.ReasonFlags.key_compromise,
    2: x509.ReasonFlags.ca_compromise,
    3: x509.ReasonFlags.affiliation_changed,
    4: x509.ReasonFlags.superseded,
    5: x509.ReasonFlags.cessation_of_operation,
    6: x509.ReasonFlags.certificate_hold,
    8: x509.ReasonFlags.remove_from_crl,
    9: x509.ReasonFlags.privilege_withdrawn,
    10: x509.ReasonFlags.aa_compromise,
}


_CRL_ENTRY_REASON_ENUM_TO_CODE = {
    x509.ReasonFlags.unspecified: 0,
    x509.ReasonFlags.key_compromise: 1,
    x509.ReasonFlags.ca_compromise: 2,
    x509.ReasonFlags.affiliation_changed: 3,
    x509.ReasonFlags.superseded: 4,
    x509.ReasonFlags.cessation_of_operation: 5,
    x509.ReasonFlags.certificate_hold: 6,
    x509.ReasonFlags.remove_from_crl: 8,
    x509.ReasonFlags.privilege_withdrawn: 9,
    x509.ReasonFlags.aa_compromise: 10
}


def _decode_crl_reason(backend, enum):
    enum = backend._ffi.cast("ASN1_ENUMERATED *", enum)
    enum = backend._ffi.gc(enum, backend._lib.ASN1_ENUMERATED_free)
    code = backend._lib.ASN1_ENUMERATED_get(enum)

    try:
        return x509.CRLReason(_CRL_ENTRY_REASON_CODE_TO_ENUM[code])
    except KeyError:
        raise ValueError("Unsupported reason code: {0}".format(code))


def _decode_invalidity_date(backend, inv_date):
    generalized_time = backend._ffi.cast(
        "ASN1_GENERALIZEDTIME *", inv_date
    )
    generalized_time = backend._ffi.gc(
        generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
    )
    return x509.InvalidityDate(
        _parse_asn1_generalized_time(backend, generalized_time)
    )


def _decode_cert_issuer(backend, gns):
    gns = backend._ffi.cast("GENERAL_NAMES *", gns)
    gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free)
    general_names = _decode_general_names(backend, gns)
    return x509.CertificateIssuer(general_names)


def _asn1_to_der(backend, asn1_type):
    buf = backend._ffi.new("unsigned char **")
    res = backend._lib.i2d_ASN1_TYPE(asn1_type, buf)
    backend.openssl_assert(res >= 0)
    backend.openssl_assert(buf[0] != backend._ffi.NULL)
    buf = backend._ffi.gc(
        buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
    )
    return backend._ffi.buffer(buf[0], res)[:]


def _asn1_integer_to_int(backend, asn1_int):
    bn = backend._lib.ASN1_INTEGER_to_BN(asn1_int, backend._ffi.NULL)
    backend.openssl_assert(bn != backend._ffi.NULL)
    bn = backend._ffi.gc(bn, backend._lib.BN_free)
    return backend._bn_to_int(bn)


def _asn1_integer_to_int_or_none(backend, asn1_int):
    if asn1_int == backend._ffi.NULL:
        return None
    else:
        return _asn1_integer_to_int(backend, asn1_int)


def _asn1_string_to_bytes(backend, asn1_string):
    return backend._ffi.buffer(asn1_string.data, asn1_string.length)[:]


def _asn1_string_to_ascii(backend, asn1_string):
    return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")


def _asn1_string_to_utf8(backend, asn1_string):
    buf = backend._ffi.new("unsigned char **")
    res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
    if res == -1:
        raise ValueError(
            "Unsupported ASN1 string type. Type: {0}".format(asn1_string.type)
        )

    backend.openssl_assert(buf[0] != backend._ffi.NULL)
    buf = backend._ffi.gc(
        buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
    )
    return backend._ffi.buffer(buf[0], res)[:].decode('utf8')


def _parse_asn1_time(backend, asn1_time):
    backend.openssl_assert(asn1_time != backend._ffi.NULL)
    generalized_time = backend._lib.ASN1_TIME_to_generalizedtime(
        asn1_time, backend._ffi.NULL
    )
    backend.openssl_assert(generalized_time != backend._ffi.NULL)
    generalized_time = backend._ffi.gc(
        generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
    )
    return _parse_asn1_generalized_time(backend, generalized_time)


def _parse_asn1_generalized_time(backend, generalized_time):
    time = _asn1_string_to_ascii(
        backend, backend._ffi.cast("ASN1_STRING *", generalized_time)
    )
    return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ")


_EXTENSION_HANDLERS_NO_SCT = {
    ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints,
    ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier,
    ExtensionOID.KEY_USAGE: _decode_key_usage,
    ExtensionOID.SUBJECT_ALTERNATIVE_NAME: _decode_subject_alt_name,
    ExtensionOID.EXTENDED_KEY_USAGE: _decode_extended_key_usage,
    ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier,
    ExtensionOID.AUTHORITY_INFORMATION_ACCESS: (
        _decode_authority_information_access
    ),
    ExtensionOID.CERTIFICATE_POLICIES: _decode_certificate_policies,
    ExtensionOID.CRL_DISTRIBUTION_POINTS: _decode_crl_distribution_points,
    ExtensionOID.FRESHEST_CRL: _decode_freshest_crl,
    ExtensionOID.OCSP_NO_CHECK: _decode_ocsp_no_check,
    ExtensionOID.INHIBIT_ANY_POLICY: _decode_inhibit_any_policy,
    ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name,
    ExtensionOID.NAME_CONSTRAINTS: _decode_name_constraints,
    ExtensionOID.POLICY_CONSTRAINTS: _decode_policy_constraints,
}
_EXTENSION_HANDLERS = _EXTENSION_HANDLERS_NO_SCT.copy()
_EXTENSION_HANDLERS[
    ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS
] = _decode_precert_signed_certificate_timestamps


_REVOKED_EXTENSION_HANDLERS = {
    CRLEntryExtensionOID.CRL_REASON: _decode_crl_reason,
    CRLEntryExtensionOID.INVALIDITY_DATE: _decode_invalidity_date,
    CRLEntryExtensionOID.CERTIFICATE_ISSUER: _decode_cert_issuer,
}

_CRL_EXTENSION_HANDLERS = {
    ExtensionOID.CRL_NUMBER: _decode_crl_number,
    ExtensionOID.DELTA_CRL_INDICATOR: _decode_delta_crl_indicator,
    ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier,
    ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name,
    ExtensionOID.AUTHORITY_INFORMATION_ACCESS: (
        _decode_authority_information_access
    ),
}

_CERTIFICATE_EXTENSION_PARSER_NO_SCT = _X509ExtensionParser(
    ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x),
    get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i),
    handlers=_EXTENSION_HANDLERS_NO_SCT
)

_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser(
    ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x),
    get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i),
    handlers=_EXTENSION_HANDLERS
)

_CSR_EXTENSION_PARSER = _X509ExtensionParser(
    ext_count=lambda backend, x: backend._lib.sk_X509_EXTENSION_num(x),
    get_ext=lambda backend, x, i: backend._lib.sk_X509_EXTENSION_value(x, i),
    handlers=_EXTENSION_HANDLERS
)

_REVOKED_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser(
    ext_count=lambda backend, x: backend._lib.X509_REVOKED_get_ext_count(x),
    get_ext=lambda backend, x, i: backend._lib.X509_REVOKED_get_ext(x, i),
    handlers=_REVOKED_EXTENSION_HANDLERS,
)

_CRL_EXTENSION_PARSER = _X509ExtensionParser(
    ext_count=lambda backend, x: backend._lib.X509_CRL_get_ext_count(x),
    get_ext=lambda backend, x, i: backend._lib.X509_CRL_get_ext(x, i),
    handlers=_CRL_EXTENSION_HANDLERS,
)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 336 B 0644
aead.py File 5.45 KB 0644
backend.py File 73.75 KB 0644
ciphers.py File 8.87 KB 0644
cmac.py File 2.72 KB 0644
decode_asn1.py File 30.09 KB 0644
dh.py File 10.59 KB 0644
dsa.py File 10.04 KB 0644
ec.py File 10.04 KB 0644
encode_asn1.py File 21.76 KB 0644
hashes.py File 2.47 KB 0644
hmac.py File 2.89 KB 0644
rsa.py File 17.16 KB 0644
utils.py File 1.37 KB 0644
x25519.py File 2.73 KB 0644
x509.py File 17.81 KB 0644