404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.147.71.116: ~ $
import base64
import json

import sys
import gnupg
import requests

"""
When using alternative routing, we want to verify as little data as possible. Thus we'll
end up relying mostly on tls key pinning. If we don't disable warnings, a warning will be
constantly popping on the terminal informing the user about it.
https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
"""
import urllib3

urllib3.disable_warnings()

from concurrent.futures import ThreadPoolExecutor


from .cert_pinning import TLSPinningAdapter
from .constants import (ALT_HASH_DICT, DEFAULT_TIMEOUT, DNS_HOSTS,
                        ENCODED_URLS, SRP_MODULUS_KEY,
                        SRP_MODULUS_KEY_FINGERPRINT)
from .exceptions import (ConnectionTimeOutError, NetworkError,
                         NewConnectionError, ProtonAPIError, TLSPinningError,
                         UnknownConnectionError, MissingDepedencyError)
from .logger import CustomLogger
from .metadata import MetadataBackend
from .srp import User as PmsrpUser


class Session:
    """A Proton Session.

    Provides public key pinning, fetch alternative routes and connect to
    Proton API in a authenticated manner, dump and load sessions.

    All this is possible since it serves as a wrapper for `<Requests>`

    Basic Usage:

      >>> import proton
      >>> s = proton.Session("https://url-to-api.ch")
      >>> s.enable_alternative_routing = True
      >>> s.api_request("/api/endpoint")
      <Response [200]>
    """
    _base_headers = {
        "x-pm-apiversion": "3",
        "Accept": "application/vnd.protonmail.v1+json"
    }
    __force_skip_alternative_routing = False

    @staticmethod
    def load(
        dump, log_dir_path, cache_dir_path,
        tls_pinning=True, timeout=DEFAULT_TIMEOUT,
        proxies=None
    ):
        """Load session from file/keyring.

        This should load the output generated by dump().

        Args:
            log_dir_path (str): path to desired logging directory
            cache_dir_path (str): path to desired cache directory
            tls_pinning (bool): tls pinning
            timeout (tuple|int|float): How long to wait for the server to send
            data before giving up.
            proxies (dict): desired proxies

        Returns:
            proton.Session
        """
        api_url = dump["api_url"]
        appversion = dump["appversion"]
        user_agent = dump["User-Agent"]
        cookies = dump.get("cookies", {})
        s = Session(
            api_url=api_url,
            log_dir_path=log_dir_path,
            cache_dir_path=cache_dir_path,
            appversion=appversion,
            user_agent=user_agent,
            tls_pinning=tls_pinning,
            timeout=timeout,
            proxies=proxies
        )
        requests.utils.add_dict_to_cookiejar(s.s.cookies, cookies)
        s._session_data = dump["session_data"]
        if s.UID is not None:
            s.s.headers["x-pm-uid"] = s.UID
            s.s.headers["Authorization"] = "Bearer " + s.AccessToken
        return s

    def dump(self):
        """Dump session.

        If you want to reuse the session, then dump it and store the values
        somewhere safe.

        Returns:
            dict
        """
        return {
            "api_url": self.__api_url,
            "appversion": self.__appversion,
            "User-Agent": self.__user_agent,
            "cookies": self.s.cookies.get_dict(),
            "session_data": self._session_data
        }

    def __init__(
        self, api_url, log_dir_path, cache_dir_path,
        appversion="Other", user_agent="None",
        tls_pinning=True, ClientSecret=None, timeout=DEFAULT_TIMEOUT,
        proxies=None
    ):
        """Constructs a new Session object.

        Args:
            api_url (string): URL for the new Session object
            appversion (string): version for the new Session object
            user_agent (string): user agent for the new Session` object
            should be in the following syntax:
                - Linux based -> ClientName/client.version (Linux; Distro/distro_version)
                - Non-linux based -> ClientName/client.version (OS)
            tls_pinning (bool): wether tls pinning should be enabled for the new
                Session object.
            ClientSecret (string): secret token for the new Session object that
                is added to the payload with key `ClientSecret`. [OPTIONAL]
            timeout (int|float|tuple): How long to wait for the server to send
                data before giving up. [OPTIONAL]
            proxies (dict): proxies to be used by the new Session object.
                This is mutually exclusive with `tls_pinning`. [OPTIONAL]
        """
        self.__api_url = api_url
        self.__appversion = appversion
        self.__user_agent = user_agent
        self.__clientsecret = ClientSecret
        self.__timeout = timeout
        self.__tls_pinning_enabled = tls_pinning
        self._logger = CustomLogger()
        self._logger.set_log_path(log_dir_path)
        self._logger = self._logger.logger
        self.__metadata = MetadataBackend.get_backend()
        self.__metadata.cache_dir_path = cache_dir_path
        self.__metadata.logger = self._logger
        self.__allow_alternative_routing = None

        # Verify modulus
        self.__gnupg = gnupg.GPG()
        self.__gnupg.import_keys(SRP_MODULUS_KEY)

        self._session_data = {}

        self.s = requests.Session()

        if proxies and self.__tls_pinning_enabled:
            raise RuntimeError("Not allowed to add proxies while TLS Pinning is enabled")

        self.s.proxies = proxies

        if self.__tls_pinning_enabled:
            self.s.mount(self.__api_url, TLSPinningAdapter())

        self.s.headers["x-pm-appversion"] = appversion
        self.s.headers["User-Agent"] = user_agent

    def api_request(
        self, endpoint,
        jsondata=None, additional_headers=None,
        method=None, params=None, _skip_alt_routing_for_api_check=False
    ):
        """Make API request.

        Args:
            endpoint (string): API endpoint.
            jsondata (json): json to send in the body.
            additional_headers (dict): additional (dictionary of) headers to send.
            method (string): get|post|put|delete|patch.
            params (dict|tuple): URL parameters to append to the URL. If a dictionary or
                list of tuples ``[(key, value)]`` is provided, form-encoding will
                take place.
            _skip_alt_routing_for_api_check (bool): used to temporarly skip alt routing.

        Returns:
            requests.Response
        """
        if self.__allow_alternative_routing is None:
            msg = "Alternative routing has not been configured before making API requests. " \
                "Please either enable or disable it before making any requests."
            self._logger.info(msg)
            raise RuntimeError(msg)

        fct = self.s.post

        if method is None:
            if jsondata is None:
                fct = self.s.get
            else:
                fct = self.s.post
        else:
            fct = {
                "get": self.s.get,
                "post": self.s.post,
                "put": self.s.put,
                "delete": self.s.delete,
                "patch": self.s.patch
            }.get(method.lower())

        if fct is None:
            raise ValueError("Unknown method: {}".format(method))

        _url = self.__api_url
        _verify = True

        if not self.__metadata.try_original_url(
            self.__allow_alternative_routing,
            self.__force_skip_alternative_routing
        ):
            _url = self.__metadata.get_alternative_url()
            _verify = False

        request_params = {
            "url": _url,
            "endpoint": endpoint,
            "headers": additional_headers,
            "json": jsondata,
            "timeout": self.__timeout,
            "verify": _verify,
            "params": params
        }

        exception_class = None
        exception_msg = None

        try:
            response = self.__make_request(fct, **request_params)
        except (
            NewConnectionError,
            ConnectionTimeOutError,
            TLSPinningError,
        ) as e:
            self._logger.exception(e)
            exc_type, *_ = sys.exc_info()
            exception_class = exc_type
            exception_msg = e
        except (Exception, requests.exceptions.BaseHTTPError) as e:
            self._logger.exception(e)
            raise UnknownConnectionError(e)

        if exception_class and (not self.__allow_alternative_routing or _skip_alt_routing_for_api_check or self.__force_skip_alternative_routing): # noqa
            self._logger.info("{}: {}".format(exception_class, exception_msg))
            raise exception_class(exception_msg)
        elif (
            exception_class in [NewConnectionError, ConnectionTimeOutError, TLSPinningError]
            and not self._is_api_reacheable()
        ):
            response = self.__try_with_alt_routing(fct, **request_params)

        try:
            status_code = response.status_code
        except: # noqa
            status_code = False

        try:
            json_error = False
            response = response.json()
        except json.decoder.JSONDecodeError as e:
            json_error = e

        if json_error and status_code != 200:
            self._logger.exception(json_error)
            raise ProtonAPIError(
                {
                    "Code": response.status_code,
                    "Error": response.reason,
                    "Headers": response.headers
                }
            )

        # This check is needed for routers or any other clients that will ask for other
        # data that is not provided in json format, such as when asking /vpn/config for
        # a .ovpn template
        try:
            if response["Code"] not in [1000, 1001]:
                if response["Code"] == 9001:
                    self.__captcha_token = response["Details"]["HumanVerificationToken"]
                elif response["Code"] == 12087:
                    del self.human_verification_token

                raise ProtonAPIError(response)
        except TypeError as e:
            if status_code != 200:
                raise TypeError(e)

        return response

    def __try_with_alt_routing(self, fct, **request_params):
        alternative_routes = self.get_alternative_routes_from_dns()

        request_params["verify"] = False
        response = None

        for route in alternative_routes:
            _alt_url = "https://{}".format(route)
            request_params["url"] = _alt_url

            if self.__tls_pinning_enabled:
                self.s.mount(_alt_url, TLSPinningAdapter(ALT_HASH_DICT))

            self._logger.info("Trying {}".format(_alt_url))
            try:
                response = self.__make_request(fct, **request_params)
            except Exception as e: # noqa
                self._logger.exception(e)
                continue
            else:
                self._logger.info("Storing alternative route: {}".format(_alt_url))
                self.__metadata.store_alternative_route(_alt_url)
                break

        if not response:
            self._logger.info("Possible network error, unable to reach API")
            raise NetworkError("Network error")

        return response

    def __make_request(self, fct, **kwargs):
        _endpoint = kwargs.pop("endpoint")
        _url = kwargs["url"]

        kwargs["url"] = _url + _endpoint
        try:
            ret = fct(**kwargs)
        except requests.exceptions.ConnectionError as e:
            raise NewConnectionError(e)
        except requests.exceptions.Timeout as e:
            raise ConnectionTimeOutError(e)
        except TLSPinningError as e:
            raise TLSPinningError(e)
        except (Exception, requests.exceptions.BaseHTTPError) as e:
            raise UnknownConnectionError(e)

        return ret

    def _is_api_reacheable(self):
        try:
            self.api_request("/tests/ping", _skip_alt_routing_for_api_check=True)
        except (NewConnectionError, ConnectionTimeOutError, TLSPinningError) as e:
            self._logger.exception(e)
            return False

        return True

    def verify_modulus(self, armored_modulus):
        # gpg.decrypt verifies the signature too, and returns the parsed data.
        # By using gpg.verify the data is not returned
        verified = self.__gnupg.decrypt(armored_modulus)

        if not (verified.valid and verified.fingerprint.lower() == SRP_MODULUS_KEY_FINGERPRINT):
            raise ValueError("Invalid modulus")

        return base64.b64decode(verified.data.strip())

    def authenticate(self, username, password):
        """Authenticate user against API.

        Args:
            username (string): proton account username
            password (string): proton account password

        Returns:
            dict

        The returning dict contains the Scope of the account. This allows
        to identify if the account is locked, has unpaid invoices, etc.
        """
        self.logout()

        payload = {"Username": username}
        if self.__clientsecret:
            payload["ClientSecret"] = self.__clientsecret

        info_response = self.api_request("/auth/info", payload)

        modulus = self.verify_modulus(info_response["Modulus"])
        server_challenge = base64.b64decode(info_response["ServerEphemeral"])
        salt = base64.b64decode(info_response["Salt"])
        version = info_response["Version"]

        usr = PmsrpUser(password, modulus)
        client_challenge = usr.get_challenge()
        client_proof = usr.process_challenge(salt, server_challenge, version)

        if client_proof is None:
            raise ValueError("Invalid challenge")

        # Send response
        payload = {
            "Username": username,
            "ClientEphemeral": base64.b64encode(client_challenge).decode(
                "utf8"
            ),
            "ClientProof": base64.b64encode(client_proof).decode("utf8"),
            "SRPSession": info_response["SRPSession"],
        }
        if self.__clientsecret:
            payload["ClientSecret"] = self.__clientsecret

        auth_response = self.api_request("/auth", payload)

        if "ServerProof" not in auth_response:
            raise ValueError("Invalid password")

        usr.verify_session(base64.b64decode(auth_response["ServerProof"]))
        if not usr.authenticated():
            raise ValueError("Invalid server proof")

        self._session_data = {
            "UID": auth_response["UID"],
            "AccessToken": auth_response["AccessToken"],
            "RefreshToken": auth_response["RefreshToken"],
            "PasswordMode": auth_response["PasswordMode"],
            "Scope": auth_response["Scope"].split(),
        }

        if self.UID is not None:
            self.s.headers["x-pm-uid"] = self.UID
            self.s.headers["Authorization"] = "Bearer " + self.AccessToken

        return self.Scope

    def provide_2fa(self, code):
        """Provide Two Factor Authentication Code to the API.

        Args:
            code (string): string of ints

        Returns:
            dict

        The returning dict contains the Scope of the account. This allows
        to identify if the account is locked, has unpaid invoices, etc.
        """
        ret = self.api_request("/auth/2fa", {"TwoFactorCode": code})
        self._session_data["Scope"] = ret["Scope"]

        return self.Scope

    def logout(self):
        """Logout from API."""
        if self._session_data:
            self.api_request("/auth", method="DELETE")
            del self.s.headers["Authorization"]
            del self.s.headers["x-pm-uid"]
            self._session_data = {}

    def refresh(self):
        """Refresh tokens.

        Refresh AccessToken with a valid RefreshToken.
        If the RefreshToken is invalid then the user will have to
        re-authenticate.
        """
        refresh_response = self.api_request(
            "/auth/refresh",
            {
                "ResponseType": "token",
                "GrantType": "refresh_token",
                "RefreshToken": self.RefreshToken,
                "RedirectURI": "http://protonmail.ch"
            }
        )
        self._session_data["AccessToken"] = refresh_response["AccessToken"]
        self._session_data["RefreshToken"] = refresh_response["RefreshToken"]
        self.s.headers["Authorization"] = "Bearer " + self.AccessToken

    def get_alternative_routes_from_dns(self, callback=None):
        """Get alternative routes to circumvent firewalls and API restrictions.

        Args:
            callback (func): a callback method to be called.
                Might be usefull for multi-threading. [OPTIONAL]

        This method leverages the power of ThreadPoolExecutor to async
        check if the provided dns hosts can be reached, and if so, collect the
        alternatives routes provided by them.
        The encoded url are done sync because most often one of the two should work,
        as it should provide the data as quick as possible.

        If callback is passed then the method does not return any value, otherwise it
        returns a set().
        """

        try:
            from dns import message
            from dns.rdatatype import TXT
        except ImportError as e:
            self._logger.exception(e)
            raise MissingDepedencyError(
                "Could not find dnspython package. "
                "Please either install the missing package or disable "
                "alternative routing."
            )

        routes = set()

        for encoded_url in ENCODED_URLS:
            dns_query, dns_encoded_data = self.__generate_dns_message(encoded_url)
            dns_hosts_response = []

            host_and_dns = [(host, dns_encoded_data) for host in DNS_HOSTS]

            with ThreadPoolExecutor(max_workers=len(DNS_HOSTS)) as executor:
                dns_hosts_response = list(
                    executor.map(self.__query_for_dns_data, host_and_dns, timeout=20)
                )
                dns_hosts_response = [dns_url for dns_url in dns_hosts_response if dns_url]

            if len(dns_hosts_response) == 0:
                continue

            for response in dns_hosts_response:
                routes = self.__extract_dns_answer(response, dns_query)

            if len(routes) > 0:
                break

        if not callback:
            return routes

        callback(routes)

    def __generate_dns_message(self, encoded_url):
        """Generate DNS message object.

        Args:
            encoded_url (string): encoded url as per documentation

        Returns:
            tuple():
                dns_query (dns.message.Message): output of dns.message.make_query
                base64_dns_message (base64): encode bytes
        """
        from dns import message
        from dns.rdatatype import TXT

        dns_query = message.make_query(encoded_url, TXT)
        dns_wire = dns_query.to_wire()
        base64_dns_message = base64.urlsafe_b64encode(dns_wire).rstrip(b"=")

        return dns_query, base64_dns_message

    def __query_for_dns_data(self, dns_settings):
        """Query DNS host for data.

        Args:
            dns_settings (tuple):
                host_url (str): http/https url
                dns_encoded_data (str): base64 output
                generate by __generate_dns_message()

        This method uses requests.get to query the url
        for dns data.

        Returns:
            bytes: content of the response
        """
        dns_host, dns_encoded_data = dns_settings[0], dns_settings[1]
        try:
            response = requests.get(
                dns_host,
                headers={"accept": "application/dns-message"},
                timeout=(3.05, 16.95),
                params={"dns": dns_encoded_data}
            )

            if response.status_code == 404:
                return
        except Exception as e: # noqa
            return

        return response.content

    def __extract_dns_answer(self, query_content, dns_query):
        """Extract alternative URL from dns message.

        Args:
            query_content (bytes): content of the response
            dns_query (dns.message.Message): output of dns.message.make_query

        Returns:
            set(): alternative routes for API
        """
        from dns import message
        r = message.from_wire(
            query_content,
            keyring=dns_query.keyring,
            request_mac=dns_query.request_mac,
            one_rr_per_rrset=False,
            ignore_trailing=False
        )
        routes = set()
        for route in r.answer:
            routes = set([str(url).strip("\"") for url in route])

        return routes

    @property
    def captcha_url(self):
        return "{}/core/v4/captcha?Token={}".format(
            self.__api_url, self.__captcha_token
        )

    @property
    def enable_alternative_routing(self):
        """Alternative routing getter."""
        return self.__allow_alternative_routing

    @enable_alternative_routing.setter
    def enable_alternative_routing(self, newvalue):
        """Alternative routing setter.

        If you would like to enable/disable alternative routing
        before making any requests, this should be set to the desired
        value.

        Args:
            newvalue (bool)
        """
        if self.__allow_alternative_routing != bool(newvalue):
            self.__allow_alternative_routing = bool(newvalue)

    @property
    def force_skip_alternative_routing(self):
        """Force skip alternative routing getter."""
        return self.__force_skip_alternative_routing

    @force_skip_alternative_routing.setter
    def force_skip_alternative_routing(self, newvalue):
        """Force skip alternative routing setter.

        Alternative routing is normally used when the usual API is not
        reacheable. In certain cases, such as when connected to the VPN,
        the usual API should be reacheable as the connection is tunneled,
        thus there is not need to reach for the alternative routes and the
        usual API is preffered to be used, for security and reliability.

        Args:
            newvalue (bool)
        """
        self.__force_skip_alternative_routing = bool(newvalue)

    @property
    def human_verification_token(self):
        return (
            self.s.headers.get("X-PM-Human-Verification-Token-Type", None),
            self.s.headers.get("X-PM-Human-Verification-Token", None)
        )

    @human_verification_token.setter
    def human_verification_token(self, newtuplevalue):
        """Set human verification token:

        Args:
            newtuplevalue (tuple): (token_type, token_value)
        """
        self.s.headers["X-PM-Human-Verification-Token-Type"] = newtuplevalue[0]
        self.s.headers["X-PM-Human-Verification-Token"] = newtuplevalue[1]

    @human_verification_token.deleter
    def human_verification_token(self):
        # Safest to use .pop() as it will onyl attempt to remove the key by name
        # while del can also remove the whole dict (in case of code/programming error)
        # Thus to prevent this, pop() is used.

        try:
            self.s.headers.pop("X-PM-Human-Verification-Token-Type")
        except (KeyError, IndexError):
            pass

        try:
            self.s.headers.pop("X-PM-Human-Verification-Token")
        except (KeyError, IndexError):
            pass

    @property
    def UID(self):
        return self._session_data.get("UID", None)

    @property
    def AccessToken(self):
        return self._session_data.get("AccessToken", None)

    @property
    def RefreshToken(self):
        return self._session_data.get("RefreshToken", None)

    @property
    def PasswordMode(self):
        return self._session_data.get("PasswordMode", None)

    @property
    def Scope(self):
        return self._session_data.get("Scope", [])

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
metadata Folder 0755
srp Folder 0755
__init__.py File 78 B 0644
api.py File 23.7 KB 0644
cert_pinning.py File 5.45 KB 0644
constants.py File 1.55 KB 0644
exceptions.py File 1 KB 0644
logger.py File 1.36 KB 0644
utils.py File 962 B 0644