404

[ Avaa Bypassed ]




Upload:

Command:

botdev@18.223.241.144: ~ $
from ipaddress import ip_network

import dbus
from dbus.mainloop.glib import DBusGMainLoop

from ... import exceptions
from ...constants import (KILLSWITCH_CONN_NAME, KILLSWITCH_INTERFACE_NAME,
                          ROUTED_CONN_NAME, ROUTED_INTERFACE_NAME,
                          IPv4_DUMMY_ADDRESS, IPv4_DUMMY_GATEWAY,
                          IPv6_DUMMY_ADDRESS, IPv6_DUMMY_GATEWAY, KILLSWITCH_DNS_PRIORITY_VALUE)
from ...enums import (KillSwitchActionEnum, KillSwitchInterfaceTrackerEnum,
                      KillswitchStatusEnum)
from ...logger import logger
from ..dbus.dbus_network_manager_wrapper import NetworkManagerUnitWrapper
from ..subprocess_wrapper import subprocess


class KillSwitch:
    # Additional loop needs to be create since SystemBus automatically
    # picks the default loop, which is intialized with the CLI.
    # Thus, to refrain SystemBus from using the default loop,
    # one extra loop is needed only to be passed, while it is never used.
    # https://dbus.freedesktop.org/doc/dbus-python/tutorial.html#setting-up-an-event-loop
    dbus_loop = DBusGMainLoop()
    bus = dbus.SystemBus(mainloop=dbus_loop)

    """Manages killswitch connection/interfaces."""
    def __init__(
        self,
        nm_wrapper=NetworkManagerUnitWrapper,
        ks_conn_name=KILLSWITCH_CONN_NAME,
        ks_interface_name=KILLSWITCH_INTERFACE_NAME,
        routed_conn_name=ROUTED_CONN_NAME,
        routed_interface_name=ROUTED_INTERFACE_NAME,
        ipv4_dummy_addrs=IPv4_DUMMY_ADDRESS,
        ipv4_dummy_gateway=IPv4_DUMMY_GATEWAY,
        ipv6_dummy_addrs=IPv6_DUMMY_ADDRESS,
        ipv6_dummy_gateway=IPv6_DUMMY_GATEWAY,
    ):
        self.ks_conn_name = ks_conn_name
        self.ks_interface_name = ks_interface_name
        self.routed_conn_name = routed_conn_name
        self.routed_interface_name = routed_interface_name
        self.ipv4_dummy_addrs = ipv4_dummy_addrs
        self.ipv4_dummy_gateway = ipv4_dummy_gateway
        self.ipv6_dummy_addrs = ipv6_dummy_addrs
        self.ipv6_dummy_gateway = ipv6_dummy_gateway
        self.nm_wrapper = nm_wrapper(self.bus)
        self.interface_state_tracker = {
            self.ks_conn_name: {
                KillSwitchInterfaceTrackerEnum.EXISTS: False,
                KillSwitchInterfaceTrackerEnum.IS_RUNNING: False
            },
            self.routed_conn_name: {
                KillSwitchInterfaceTrackerEnum.EXISTS: False,
                KillSwitchInterfaceTrackerEnum.IS_RUNNING: False
            }
        }

        logger.info("Initialized killswitch manager")
        self.get_status_connectivity_check()

    def manage(self, action, server_ip=None):
        """Manage killswitch.

        Args:
            action (string|int): either pre_connection or post_connection
            is_menu (bool): if the action comes from configurations menu,
                if so, then action is int
            server_ip (string): server ip to be connected to
        """
        logger.info(
            "Manage Killswitch action: {}".format(
                action,
            )
        )

        self._ensure_connectivity_check_is_disabled()

        self.update_connection_status()

        actions_dict = {
            KillSwitchActionEnum.PRE_CONNECTION:
            self.setup_pre_connection_ks,
            KillSwitchActionEnum.POST_CONNECTION:
            self.setup_post_connection_ks,
            KillSwitchActionEnum.SOFT: self.setup_soft_connection,
            KillSwitchActionEnum.DISABLE: self.delete_all_connections

        }[action](server_ip)

    def update_from_user_configuration_menu(self, action):
        logger.info(
            "Update from menu killswitch action: {}".format(
                action,
            )
        )

        self._ensure_connectivity_check_is_disabled()
        self.update_connection_status()

        if action == KillswitchStatusEnum.HARD:
            try:
                self.delete_connection(self.routed_conn_name)
            except: # noqa
                pass

            if not self.interface_state_tracker[self.ks_conn_name][
                KillSwitchInterfaceTrackerEnum.EXISTS
            ]:
                self.create_killswitch_connection()
                return
            else:
                self.activate_connection(self.ks_conn_name)
        elif action in [
            KillswitchStatusEnum.SOFT, KillswitchStatusEnum.DISABLED
        ]:
            self.delete_all_connections()
        else:
            raise exceptions.KillswitchError(
                "Incorrect option for killswitch manager"
            )

    def setup_pre_connection_ks(self, server_ip, pre_attempts=0):
        """Assure pre-connection Kill Switch is setup correctly.

        Args:
            server_ip (list | string): Proton VPN server IP
            pre_attempts (int): number of setup attempts
        """
        self.update_connection_status()

        if pre_attempts >= 5:
            raise exceptions.KillswitchError(
                "Unable to setup pre-connection ks. "
                "Exceeded maximum attempts."
            )

        logger.info("Pre-setup attempts: {}".format(pre_attempts))

        # happy path
        if (
            self.interface_state_tracker[self.ks_conn_name][
                KillSwitchInterfaceTrackerEnum.IS_RUNNING
            ]
            and not self.interface_state_tracker[self.routed_conn_name][
                KillSwitchInterfaceTrackerEnum.EXISTS
            ]
        ):
            logger.info("Following happy path for pre setup")
            self.create_routed_connection(server_ip)
            self.deactivate_connection(self.ks_conn_name)
            return
        elif (
            not self.interface_state_tracker[self.ks_conn_name][
                KillSwitchInterfaceTrackerEnum.IS_RUNNING
            ]
            and self.interface_state_tracker[self.routed_conn_name][
                KillSwitchInterfaceTrackerEnum.IS_RUNNING
            ]
        ):
            logger.info("Both interfaces are correctly setup")
            return

        # check for routed ks and remove if present/running
        if (
            self.interface_state_tracker[self.routed_conn_name][
                KillSwitchInterfaceTrackerEnum.EXISTS
            ]
            and self.interface_state_tracker[self.routed_conn_name][
                KillSwitchInterfaceTrackerEnum.IS_RUNNING
            ]
        ):
            logger.info("Deleting routed kill switch interface")
            self.delete_connection(self.routed_conn_name)

        # check if ks exists. Start it if it does
        # if not then create and start it
        if (
            self.interface_state_tracker[
                self.ks_conn_name
            ][KillSwitchInterfaceTrackerEnum.EXISTS]
        ):
            logger.info("Activating kill switch interface")
            self.activate_connection(self.ks_conn_name)
        else:
            logger.info("Creating kill switch interface")
            self.create_killswitch_connection()

        pre_attempts += 1
        self.setup_pre_connection_ks(server_ip, pre_attempts=pre_attempts)

    def setup_post_connection_ks(
        self, _, post_attempts=0, activating_soft_connection=False
    ):
        """Assure post-connection Kill Switch is setup correctly.

        Args:
            post_attempts (int): number of setup attempts
        """
        self.update_connection_status()

        if post_attempts >= 5:
            raise exceptions.KillswitchError(
                "Unable to setup post-connection ks. "
                "Exceeded maximum attempts."
            )

        logger.info("Post-setup attempts: {}".format(post_attempts))

        # happy path
        if (
            not self.interface_state_tracker[self.ks_conn_name][
                KillSwitchInterfaceTrackerEnum.IS_RUNNING
            ]
            and self.interface_state_tracker[self.routed_conn_name][
                KillSwitchInterfaceTrackerEnum.IS_RUNNING
            ]
        ):
            logger.info("Following happy path for post setup")
            self.activate_connection(self.ks_conn_name)
            self.delete_connection(self.routed_conn_name)

            return
        elif (
            activating_soft_connection
            and (
                not self.interface_state_tracker[self.routed_conn_name][
                    KillSwitchInterfaceTrackerEnum.IS_RUNNING
                ] or not self.interface_state_tracker[self.routed_conn_name][
                    KillSwitchInterfaceTrackerEnum.EXISTS
                ]
            )
        ):
            logger.info("Following happy path for soft-connection post setup")
            self.activate_connection(self.ks_conn_name)
            return
        elif (
            self.interface_state_tracker[self.ks_conn_name][
                KillSwitchInterfaceTrackerEnum.IS_RUNNING
            ] and (
                not self.interface_state_tracker[self.routed_conn_name][
                    KillSwitchInterfaceTrackerEnum.EXISTS
                ] or not self.interface_state_tracker[self.routed_conn_name][
                    KillSwitchInterfaceTrackerEnum.IS_RUNNING
                ]
            )
        ):
            logger.info("Both interfaces are correctly setup")
            return

        # check for ks and disable it if is running
        if (
            self.interface_state_tracker[self.ks_conn_name][
                KillSwitchInterfaceTrackerEnum.IS_RUNNING
            ]
        ):
            logger.info("Deactivating kill switch interface")
            self.deactivate_connection(self.ks_conn_name)

        # check if routed ks exists, if so then activate it
        # else raise exception
        if (
            self.interface_state_tracker[self.routed_conn_name][KillSwitchInterfaceTrackerEnum.EXISTS] # noqa
        ):
            logger.info("Activating kill routed interface")
            self.activate_connection(self.routed_conn_name)
        else:
            raise Exception("Routed connection does not exist")

        post_attempts += 1
        self.setup_post_connection_ks(
            _, post_attempts=post_attempts,
            activating_soft_connection=activating_soft_connection
        )

    def setup_soft_connection(self, _):
        """Setup Kill Switch for --on setting."""
        self.create_killswitch_connection()
        self.setup_post_connection_ks(None, activating_soft_connection=True)

    def create_killswitch_connection(self):
        """Create killswitch connection/interface."""
        subprocess_command = [
            "nmcli", "c", "a", "type", "dummy",
            "ifname", self.ks_interface_name,
            "con-name", self.ks_conn_name,
            "ipv4.method", "manual",
            "ipv4.addresses", self.ipv4_dummy_addrs,
            "ipv4.gateway", self.ipv4_dummy_gateway,
            "ipv6.method", "manual",
            "ipv6.addresses", self.ipv6_dummy_addrs,
            "ipv6.gateway", self.ipv6_dummy_gateway,
            "ipv4.route-metric", "98",
            "ipv6.route-metric", "98",
            "ipv4.dns-priority", KILLSWITCH_DNS_PRIORITY_VALUE,
            "ipv6.dns-priority", KILLSWITCH_DNS_PRIORITY_VALUE,
            "ipv4.ignore-auto-dns", "yes",
            "ipv6.ignore-auto-dns", "yes",
            "ipv4.dns", "0.0.0.0",
            "ipv6.dns", "::1"
        ]
        self.update_connection_status()
        if not self.interface_state_tracker[self.ks_conn_name][
            KillSwitchInterfaceTrackerEnum.EXISTS
        ]:
            self.create_connection(
                self.ks_conn_name,
                "Unable to activate {}".format(self.ks_conn_name),
                subprocess_command, exceptions.CreateBlockingKillswitchError
            )

    def create_routed_connection(self, server_ip, try_route_addrs=False):
        """Create routed connection/interface.

        Args:
            server_ip (list(string)): the IP of the server to be connected
        """
        if isinstance(server_ip, list):
            server_ip = server_ip.pop()

        subnet_list = list(ip_network('0.0.0.0/0').address_exclude(
            ip_network(server_ip)
        ))

        route_data = [str(ipv4) for ipv4 in subnet_list]
        route_data_str = ",".join(route_data)

        subprocess_command = [
            "nmcli", "c", "a", "type", "dummy",
            "ifname", self.routed_interface_name,
            "con-name", self.routed_conn_name,
            "ipv4.method", "manual",
            "ipv4.addresses", self.ipv4_dummy_addrs,
            "ipv6.method", "manual",
            "ipv6.addresses", self.ipv6_dummy_addrs,
            "ipv6.gateway", self.ipv6_dummy_gateway,
            "ipv4.route-metric", "97",
            "ipv6.route-metric", "97",
            "ipv4.routes", route_data_str,
            "ipv4.dns-priority", KILLSWITCH_DNS_PRIORITY_VALUE,
            "ipv6.dns-priority", KILLSWITCH_DNS_PRIORITY_VALUE,
            "ipv4.ignore-auto-dns", "yes",
            "ipv6.ignore-auto-dns", "yes",
            "ipv4.dns", "0.0.0.0",
            "ipv6.dns", "::1"
        ]

        if try_route_addrs:
            subprocess_command = [
                "nmcli", "c", "a", "type", "dummy",
                "ifname", self.routed_interface_name,
                "con-name", self.routed_conn_name,
                "ipv4.method", "manual",
                "ipv4.addresses", route_data_str,
                "ipv6.method", "manual",
                "ipv6.addresses", self.ipv6_dummy_addrs,
                "ipv6.gateway", self.ipv6_dummy_gateway,
                "ipv4.route-metric", "97",
                "ipv6.route-metric", "97",
                "ipv4.dns-priority", KILLSWITCH_DNS_PRIORITY_VALUE,
                "ipv6.dns-priority", KILLSWITCH_DNS_PRIORITY_VALUE,
                "ipv4.ignore-auto-dns", "yes",
                "ipv6.ignore-auto-dns", "yes",
                "ipv4.dns", "0.0.0.0",
                "ipv6.dns", "::1"
            ]

        logger.info(subprocess_command)
        exception_msg = "Unable to activate {}".format(self.routed_conn_name)

        try:
            self.create_connection(
                self.routed_conn_name, exception_msg,
                subprocess_command, exceptions.CreateRoutedKillswitchError
            )
        except exceptions.CreateRoutedKillswitchError as e:
            if e.additional_context.returncode == 2 and not try_route_addrs:
                return self.create_routed_connection(server_ip, True)
            else:
                raise exceptions.CreateRoutedKillswitchError(exception_msg)

    def create_connection(
        self, conn_name, exception_msg,
        subprocess_command, exception
    ):
        self.update_connection_status()
        if not self.interface_state_tracker[conn_name][
            KillSwitchInterfaceTrackerEnum.EXISTS
        ]:
            self.run_subprocess(
                exception,
                exception_msg,
                subprocess_command
            )

    def activate_connection(self, conn_name):
        """Activate a connection based on connection name.

        Args:
            conn_name (string): connection name (uid)
        """
        self.update_connection_status()
        conn_dict = self.nm_wrapper.search_for_connection( # noqa
            conn_name,
            return_device_path=True,
            return_settings_path=True
        )
        if (
            self.interface_state_tracker[conn_name][
                KillSwitchInterfaceTrackerEnum.EXISTS
            ]
        ) and (
            not self.interface_state_tracker[conn_name][
                KillSwitchInterfaceTrackerEnum.IS_RUNNING
            ]
        ) and conn_dict:
            device_path = str(conn_dict.get("device_path"))
            settings_path = str(conn_dict.get("settings_path"))

            try:
                active_conn = self.nm_wrapper.activate_connection(
                    settings_path, device_path
                )
            except dbus.exceptions.DBusException as e:
                logger.exception(e)
                raise exceptions.ActivateKillswitchError(
                    "Unable to activate {}".format(conn_name)
                )
            else:
                if active_conn:
                    return
                logger.error(
                    "Dbus returned empty, unable to activate connection"
                )
                raise exceptions.ActivateKillswitchError(
                    "Unable to activate {}".format(conn_name)
                )

    def deactivate_connection(self, conn_name):
        """Deactivate a connection based on connection name.

        Args:
            conn_name (string): connection name (uid)
        """
        self.update_connection_status()
        active_conn_dict = self.nm_wrapper.search_for_connection( # noqa
            conn_name, is_active=True,
            return_active_conn_path=True
        )
        if (
            self.interface_state_tracker[conn_name][
                KillSwitchInterfaceTrackerEnum.IS_RUNNING
            ] and active_conn_dict
        ):
            active_conn_path = str(active_conn_dict.get("active_conn_path"))
            try:
                self.nm_wrapper.disconnect_connection(
                    active_conn_path
                )
            except dbus.exceptions.DBusException as e:
                logger.exception(e)
                raise exceptions.DectivateKillswitchError(
                    "Unable to deactivate {}".format(conn_name)
                )

    def delete_connection(self, conn_name):
        """Delete a connection based on connection name.

        If it fails to delete the connection, it will attempt to deactivate it.

        Args:
            conn_name (string): connection name (uid)
        """
        subprocess_command = ""\
            "nmcli c delete {}".format(conn_name).split(" ")

        self.update_connection_status()
        if self.interface_state_tracker[conn_name][KillSwitchInterfaceTrackerEnum.EXISTS]: # noqa
            self.run_subprocess(
                exceptions.DeleteKillswitchError,
                "Unable to delete {}".format(conn_name),
                subprocess_command
            )

    def deactivate_all_connections(self):
        """Deactivate all connections."""
        self.deactivate_connection(self.ks_conn_name)
        self.deactivate_connection(self.routed_conn_name)

    def delete_all_connections(self, _=None):
        """Delete all connections."""
        self.delete_connection(self.ks_conn_name)
        self.delete_connection(self.routed_conn_name)

    def update_connection_status(self):
        """Update connection/interface status."""
        all_conns = self.nm_wrapper.get_all_connections()
        active_conns = self.nm_wrapper.get_all_active_connections()
        self.interface_state_tracker[self.ks_conn_name][KillSwitchInterfaceTrackerEnum.EXISTS] = False # noqa
        self.interface_state_tracker[self.routed_conn_name][KillSwitchInterfaceTrackerEnum.EXISTS] = False  # noqa
        self.interface_state_tracker[self.ks_conn_name][KillSwitchInterfaceTrackerEnum.IS_RUNNING] = False # noqa
        self.interface_state_tracker[self.routed_conn_name][KillSwitchInterfaceTrackerEnum.IS_RUNNING] = False  # noqa

        for conn in all_conns:
            try:
                conn_name = str(self.nm_wrapper.get_settings_from_connection(
                    conn
                )["connection"]["id"])
            except dbus.exceptions.DBusException:
                conn_name = "None"

            if conn_name in self.interface_state_tracker:
                self.interface_state_tracker[conn_name][
                    KillSwitchInterfaceTrackerEnum.EXISTS
                ] = True

        for active_conn in active_conns:
            try:
                conn_name = str(self.nm_wrapper.get_active_connection_properties(
                    active_conn
                )["Id"])
            except dbus.exceptions.DBusException:
                conn_name = "None"

            if conn_name in self.interface_state_tracker:
                self.interface_state_tracker[conn_name][
                    KillSwitchInterfaceTrackerEnum.IS_RUNNING
                ] = True

        logger.info("Tracker info: {}".format(self.interface_state_tracker))

    def run_subprocess(self, exception, exception_msg, *args):
        """Run provided input via subprocess.

        Args:
            exception (exceptions.KillswitchError): exception based on action
            exception_msg (string): exception message
            *args (list): arguments to be passed to subprocess
        """
        subprocess_outpout = subprocess.run(
            *args, stderr=subprocess.PIPE, stdout=subprocess.PIPE
        )

        if (
            subprocess_outpout.returncode != 0
            and subprocess_outpout.returncode != 10
        ):
            logger.error(
                "Interface state tracker: {}".format(
                    self.interface_state_tracker
                )
            )
            logger.error(
                "{}: {}. Raising exception.".format(
                    exception,
                    subprocess_outpout
                )
            )
            raise exception(
                exception_msg,
                subprocess_outpout
            )

    def _ensure_connectivity_check_is_disabled(self):
        conn_check = self.connectivity_check()

        if len(conn_check) > 0:
            logger.info("Attempting to disable connectivity check")
            self.disable_connectivity_check(
                conn_check[0], conn_check[1]
            )

    def connectivity_check(self):
        (
            is_conn_check_available,
            is_conn_check_enabled,
        ) = self.get_status_connectivity_check()

        if not is_conn_check_enabled:
            return tuple()

        if not is_conn_check_available:
            logger.error(
                "AvailableConnectivityCheckError: "
                + "Unable to change connectivity check for killswitch."
                + "Raising exception."
            )
            raise exceptions.AvailableConnectivityCheckError(
                "Unable to change connectivity check for killswitch"
            )

        return is_conn_check_available, is_conn_check_enabled

    def get_status_connectivity_check(self):
        """Check status of NM connectivity check."""
        nm_props = self.nm_wrapper.get_network_manager_properties()
        is_conn_check_available = nm_props["ConnectivityCheckAvailable"]
        is_conn_check_enabled = nm_props["ConnectivityCheckEnabled"]

        logger.info(
            "Conn check available ({}) - Conn check enabled ({})".format(
                is_conn_check_available,
                is_conn_check_enabled
            )
        )

        return is_conn_check_available, is_conn_check_enabled

    def disable_connectivity_check(
        self, is_conn_check_available, is_conn_check_enabled
    ):
        """Disable NetworkManager connectivity check."""
        if is_conn_check_enabled:
            logger.info("Disabling connectivity check")
            nm_methods = self.nm_wrapper.get_network_manager_properties_interface()
            nm_methods.Set(
                "org.freedesktop.NetworkManager",
                "ConnectivityCheckEnabled",
                False
            )
            nm_props = self.nm_wrapper.get_network_manager_properties()
            if nm_props["ConnectivityCheckEnabled"]:
                logger.error(
                    "DisableConnectivityCheckError: "
                    + "Can not disable connectivity check for killswitch."
                    + "Raising exception."
                )
                raise exceptions.DisableConnectivityCheckError(
                    "Can not disable connectivity check for killswitch"
                )

            logger.info("Check connectivity has been 'disabled'")

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 136 B 0644
ipv6_leak_protection.py File 10.77 KB 0644
killswitch.py File 23.43 KB 0644