404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.139.94.189: ~ $
import os

import dbus
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
from protonvpn_nm_lib.constants import VIRTUAL_DEVICE_NAME
from protonvpn_nm_lib.core.environment import ExecutionEnvironment
from protonvpn_nm_lib.daemon.daemon_logger import logger
from protonvpn_nm_lib.enums import (KillSwitchActionEnum, KillswitchStatusEnum,
                                    VPNConnectionReasonEnum,
                                    VPNConnectionStateEnum)

env = ExecutionEnvironment()
connection_metadata = env.connection_metadata
killswitch = env.killswitch
ipv6_leak_protection = env.ipv6leak
settings = env.settings

from protonvpn_nm_lib.core.dbus.dbus_login1_wrapper import Login1UnitWrapper
from protonvpn_nm_lib.core.dbus.dbus_network_manager_wrapper import \
    NetworkManagerUnitWrapper


class ProtonVPNReconnector:
    """Reconnects to VPN if disconnected not by user
        or when connecting to a new network.

    Params:
        virtual_device_name (string): Name of virtual device that will be used
        for ProtonVPNReconnector
        max_attempts (int): Maximum number of attempts ofreconnection VPN
        session on failures
        param delay (int): Miliseconds to wait before reconnecting VPN

    """
    def __init__(self, virtual_device_name, loop, max_attempts=100, delay=5000): # noqa
        logger.info(
            "\n\n------------------------"
            " Initializing Dbus daemon manager "
            "------------------------\n"
        )
        self.virtual_device_name = virtual_device_name
        self.loop = loop
        self.max_attempts = max_attempts
        self.delay = delay
        self.failed_attempts = 0
        self.bus = dbus.SystemBus()
        self.nm_wrapper = NetworkManagerUnitWrapper(self.bus)
        self.login1_wrapper = Login1UnitWrapper(self.bus)
        self.is_user_session_locked = False
        self.suspend_lock = None
        self.shutdown_lock = None
        # Auto connect at startup (Listen for StateChanged going forward)
        self.vpn_activator()
        self.connect_signals()

    def connect_signals(self):
        self.login1_wrapper.connect_user_session_object_to_signal(
            "Lock", self.on_session_lock
        )
        self.login1_wrapper.connect_user_session_object_to_signal(
            "Unlock", self.on_session_unlock
        )
        self.login1_wrapper.connect_login1_object_to_signal(
            "PrepareForShutdown", self.on_prepare_for_shutdown
        )
        self.login1_wrapper.connect_login1_object_to_signal(
            "PrepareForSleep", self.on_prepare_for_suspend
        )
        self.nm_wrapper.connect_network_manager_object_to_signal(
            "StateChanged", self.on_network_state_changed
        )
        self._create_on_suspend_lock()
        self._create_on_shutdown_lock()

        self.is_user_session_locked = \
            False \
            if self.login1_wrapper.get_properties_current_user_session()["State"] == "active" \
            else True

    def _create_on_suspend_lock(self):
        if self.suspend_lock:
            return

        login_manager_interface = self.login1_wrapper.get_login_manager_interface()
        try:
            logger.info("Create sleep inhibit lock")
            self.suspend_lock = login_manager_interface.Inhibit(
                "sleep", "ProtonVPN", "Update session lock status", "delay"
            ).take()
            logger.info("Sleep lock created: {} {}".format(
                self.suspend_lock, type(self.suspend_lock)
            ))
        except Exception as e:
            logger.exception(e)

    def _create_on_shutdown_lock(self):
        if self.shutdown_lock:
            return

        login_manager_interface = self.login1_wrapper.get_login_manager_interface()
        try:
            logger.info("Create shutdown inhibit lock")
            self.shutdown_lock = login_manager_interface.Inhibit(
                "shutdown", "ProtonVPN", "Remove VPN interfaces", "delay"
            ).take()
            logger.info("Shutdown lock created: {} {}".format(
                self.shutdown_lock, type(self.shutdown_lock)
            ))
        except Exception as e:
            logger.exception(e)

    def on_session_lock(self):
        self.is_user_session_locked = True
        logger.info("Session state: \"{}\"".format("Locked" if self.is_user_session_locked else "Unlocked"))

    def on_session_unlock(self):
        self.is_user_session_locked = False
        logger.info("Session state: \"{}\"".format("Locked" if self.is_user_session_locked else "Unlocked"))
        self.vpn_activator()

    def on_prepare_for_shutdown(self, *args, **kwargs):
        logger.info("Preparing for shutdown")

        if settings.killswitch != KillswitchStatusEnum.HARD:
            logger.info("Remove Kill Switch interface")
            killswitch.delete_all_connections()

        logger.info("Remove IPv6 leak protection")
        ipv6_leak_protection.remove_leak_protection()

        if not self.shutdown_lock:
            return

        logger.info(
            "Attempting to release shutdown lock: {} {}".format(
                self.shutdown_lock, type(self.shutdown_lock)
            )
        )
        try:
            os.close(self.shutdown_lock)
            self.shutdown_lock = None
        except Exception as e:
            logger.exception(e)
            return

        logger.info("Successuflly released shutdown lock")

    def on_prepare_for_suspend(self, *args, **kwargs):
        logger.info("Preparing for sleep")

        self.is_user_session_locked = True
        logger.info("Session state: \"{}\"".format("Locked" if self.is_user_session_locked else "Unlocked"))

        if not self.suspend_lock:
            return

        logger.info(
            "Attempting to release suspend lock: {} {}".format(
                self.suspend_lock, type(self.suspend_lock)
            )
        )

        try:
            os.close(self.suspend_lock)
            self.suspend_lock = None
        except Exception as e:
            logger.exception(e)
            return

        logger.info("Successuflly released suspend lock")

    def on_network_state_changed(self, state):
        """Network status signal handler.

        Args:
            state (int): connection state (NMState)
        """
        logger.info("Network state changed: {}".format(state))
        if state == 70:
            self.vpn_activator()

    def on_vpn_state_changed(self, state, reason):
        """VPN status signal handler.

        Args:
            state (int): NMVpnConnectionState
            reason (int): NMActiveConnectionStateReason
        """
        state = VPNConnectionStateEnum(state)
        reason = VPNConnectionReasonEnum(reason)
        logger.info(
            "State: {} - ".format(state)
            + "Reason: {}".format(
                reason
            )
        )
        if state == VPNConnectionStateEnum.IS_ACTIVE and not self.is_user_session_locked:
            logger.info(
                "Proton VPN with virtual device '{}' is running.".format(
                    self.virtual_device_name
                )
            )
            self.failed_attempts = 0

            connection_metadata.save_connect_time()

            try:
                if ipv6_leak_protection.enable_ipv6_leak_protection:
                    ipv6_leak_protection.manage(
                        KillSwitchActionEnum.ENABLE
                    )
            except: # noqa
                pass

            if (
                settings.killswitch
                != KillswitchStatusEnum.DISABLED
            ):
                killswitch.manage(
                    KillSwitchActionEnum.POST_CONNECTION
                )
                logger.info("Running killswitch post-conneciton mode")

        elif (
            state == VPNConnectionStateEnum.DISCONNECTED
            and reason == VPNConnectionReasonEnum.USER_HAS_DISCONNECTED
            and not self.is_user_session_locked
        ):
            logger.info("Proton VPN connection was manually disconnected.")
            self.failed_attempts = 0

            try:
                vpn_iface = self.nm_wrapper.get_vpn_interface()
            except TypeError as e:
                logger.exception(e)

            try:
                vpn_iface.Delete()
            except dbus.exceptions.DBusException as e:
                logger.error(
                    "Unable to remove connection. "
                    + "Exception: {}".format(e)
                )
            except AttributeError:
                pass

            logger.info("Proton VPN connection has been manually removed.")

            try:
                ipv6_leak_protection.manage(
                    KillSwitchActionEnum.DISABLE
                )
            except: # noqa
                pass

            if (
                settings.killswitch
                != KillswitchStatusEnum.HARD
            ):
                killswitch.delete_all_connections()

            loop.quit()

        elif state in [
            VPNConnectionStateEnum.FAILED,
            VPNConnectionStateEnum.DISCONNECTED
        ] and not self.is_user_session_locked:
            # reconnect if haven't reached max_attempts
            if (
                not self.max_attempts
            ) or (
                self.failed_attempts < self.max_attempts
            ):
                logger.info("Connection failed, attempting to reconnect.")
                self.failed_attempts += 1
                glib_reconnect = True
                GLib.timeout_add(
                    self.delay, self.vpn_activator, glib_reconnect
                )
            else:
                logger.warning(
                    "Connection failed, exceeded {} max attempts.".format(
                        self.max_attempts
                    )
                )

    def setup_protonvpn_conn(self, active_connection, vpn_interface):
        """Setup and start new Proton VPN connection.

        Args:
            active_connection (string): path to active connection
            vpn_interface (dbus.Proxy): proxy interface to vpn connection
        """
        logger.info(
            "Setting up Proton VPN connecton: {} {}".format(
                active_connection, vpn_interface
            )
        )
        new_con = self.nm_wrapper.activate_connection(
            vpn_interface,
            dbus.ObjectPath("/"),
            active_connection
        )
        self.vpn_signal_handler(new_con)
        logger.info(
            "Starting manually Proton VPN connection with '{}'.".format(
                self.virtual_device_name
            )
        )

    def manually_start_vpn_conn(self, server_ip, vpn_interface):
        logger.info("User ks setting: {}".format(
            settings.killswitch
        ))
        if (
            settings.killswitch
            != KillswitchStatusEnum.DISABLED
        ):
            try:
                killswitch.manage(
                    KillSwitchActionEnum.PRE_CONNECTION,
                    server_ip=server_ip
                )
            except Exception as e:
                logger.exception(
                    "KS manager reconnect exception: {}".format(e)
                )
                return False
        logger.info("Created routed interface")

        try:
            new_active_connection = self.nm_wrapper.get_active_connection()
        except (dbus.exceptions.DBusException, Exception) as e:
            logger.exception(e)
            new_active_connection = None

        logger.info(
            "Active conn prior to "
            "setup manual connection: {} {}".format(
                new_active_connection,
                type(new_active_connection)
            )
        )

        if not new_active_connection:
            logger.info("No active connection, retrying reconnect")
            return False
        else:
            logger.info("Setting up manual connection")

            try:
                self.setup_protonvpn_conn(
                    new_active_connection, vpn_interface
                )
            except dbus.exceptions.DBusException as e:
                logger.exception(
                    "Unable to start VPN connection: {}.".format(e)
                )
                return False
            except Exception as e:
                logger.exception(
                    "Unknown reconnector error: {}.".format(e)
                )
                return False

            logger.info(
                "New Proton VPN connection has been started "
                + "from service."
            )
            return True

    def vpn_activator(self, glib_reconnect=False):
        """Monitor and activate Proton VPN connections."""
        logger.info(
            "\n\n------- "
            "VPN Activator"
            " -------\n"
            + "Virtual device being monitored: {}; ".format(
                self.virtual_device_name
            ) + "Attempt {}/{} with interval of {} ".format(
                self.failed_attempts, self.max_attempts, self.delay
            ) + "ms;\n"
        )
        if self.is_user_session_locked:
            return

        vpn_interface = self.nm_wrapper.get_vpn_interface()

        try:
            active_connection = self.nm_wrapper.get_active_connection()
        except (dbus.exceptions.DBusException, Exception) as e:
            logger.exception(e)
            active_connection = None

        logger.info("VPN interface: {}".format(vpn_interface))
        logger.info("Active connection: {}".format(active_connection))

        if active_connection is None or vpn_interface is None:
            if not glib_reconnect:
                logger.info("Calling manually on vpn state changed")
                self.on_vpn_state_changed(
                    VPNConnectionStateEnum.FAILED,
                    VPNConnectionReasonEnum.UNKNOWN
                )
            else:
                return True

        (
            is_active_conn_vpn,
            all_vpn_settings
        ) = self.nm_wrapper.check_active_vpn_connection(
            active_connection
        )

        # Check if primary active connection was started by Proton VPN client
        if (
            is_active_conn_vpn
        ) and (
            all_vpn_settings["vpn"]["data"]["dev"]
            == self.virtual_device_name
        ):
            logger.info("Primary connection via Proton VPN.")
            self.vpn_signal_handler(active_connection)
            return False

        server_ip = connection_metadata.get_server_ip()
        logger.info("Reconnecting to server IP \"{}\"".format(server_ip))

        try:
            (
                is_protonvpn, state, conn
            ) = self.nm_wrapper.is_protonvpn_being_prepared()
        except dbus.exceptions.DBusException as e:
            logger.exception(e)
        else:

            # Check if connection is being prepared
            if is_protonvpn and state == 1:
                logger.info("Proton VPN connection is being prepared.")
                if (
                    settings.killswitch
                    != KillswitchStatusEnum.DISABLED
                ):
                    killswitch.manage(
                        KillSwitchActionEnum.PRE_CONNECTION,
                        server_ip=server_ip
                    )
                self.vpn_signal_handler(conn)
                return False

        if not self.manually_start_vpn_conn(server_ip, vpn_interface):
            if not glib_reconnect:
                logger.info("Calling manually on vpn state changed")
                self.on_vpn_state_changed(
                    VPNConnectionStateEnum.FAILED,
                    VPNConnectionReasonEnum.UNKNOWN
                )
            else:
                return True

    def vpn_signal_handler(self, conn):
        """Add signal handler to Proton VPN connection.

        Args:
            vpn_conn_path (string): path to Proton VPN connection
        """
        proxy = self.bus.get_object(
            "org.freedesktop.NetworkManager", conn
        )
        iface = dbus.Interface(
            proxy, "org.freedesktop.NetworkManager.VPN.Connection"
        )

        try:
            active_conn_props = self.nm_wrapper.get_active_connection_properties(conn)
            logger.info("Adding listener to active {} connection at {}".format(
                active_conn_props["Id"],
                conn)
            )
        except dbus.exceptions.DBusException:
            logger.info(
                "{} is not an active connection.".format(conn)
            )
        except Exception as e:
            logger.info(
                "Unknown add signal error: {}".format(e)
            )
        else:
            logger.info("Listener added")
            iface.connect_to_signal(
                "VpnStateChanged", self.on_vpn_state_changed
            )


DBusGMainLoop(set_as_default=True)
loop = GLib.MainLoop()
ins = ProtonVPNReconnector(VIRTUAL_DEVICE_NAME, loop)
loop.run()

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 0 B 0644
daemon_logger.py File 1.01 KB 0644
dbus_daemon_reconnector.py File 16.69 KB 0644