404

[ Avaa Bypassed ]




Upload:

Command:

botdev@18.222.30.84: ~ $
"""Test certbot.reverter."""
import csv
import logging
import os
import shutil
import tempfile
import unittest

import mock
import six

from certbot import errors

from certbot.tests import util as test_util


class ReverterCheckpointLocalTest(test_util.ConfigTestCase):
    # pylint: disable=too-many-instance-attributes, too-many-public-methods
    """Test the Reverter Class."""
    def setUp(self):
        super(ReverterCheckpointLocalTest, self).setUp()
        from certbot.reverter import Reverter

        # Disable spurious errors... we are trying to test for them
        logging.disable(logging.CRITICAL)

        self.reverter = Reverter(self.config)

        tup = setup_test_files()
        self.config1, self.config2, self.dir1, self.dir2, self.sets = tup

    def tearDown(self):
        shutil.rmtree(self.config.work_dir)
        shutil.rmtree(self.dir1)
        shutil.rmtree(self.dir2)

        logging.disable(logging.NOTSET)

    @mock.patch("certbot.reverter.Reverter._read_and_append")
    def test_no_change(self, mock_read):
        mock_read.side_effect = OSError("cannot even")
        try:
            self.reverter.add_to_checkpoint(self.sets[0], "save1")
        except OSError:
            pass
        self.reverter.finalize_checkpoint("blah")
        path = os.listdir(self.reverter.config.backup_dir)[0]
        no_change = os.path.join(self.reverter.config.backup_dir, path, "CHANGES_SINCE")
        with open(no_change, "r") as f:
            x = f.read()
        self.assertTrue("No changes" in x)

    @test_util.broken_on_windows
    def test_basic_add_to_temp_checkpoint(self):
        # These shouldn't conflict even though they are both named config.txt
        self.reverter.add_to_temp_checkpoint(self.sets[0], "save1")
        self.reverter.add_to_temp_checkpoint(self.sets[1], "save2")

        self.assertTrue(os.path.isdir(self.config.temp_checkpoint_dir))
        self.assertEqual(get_save_notes(
            self.config.temp_checkpoint_dir), "save1save2")
        self.assertFalse(os.path.isfile(
            os.path.join(self.config.temp_checkpoint_dir, "NEW_FILES")))

        self.assertEqual(
            get_filepaths(self.config.temp_checkpoint_dir),
            "{0}\n{1}\n".format(self.config1, self.config2))

    def test_add_to_checkpoint_copy_failure(self):
        with mock.patch("certbot.reverter.shutil.copy2") as mock_copy2:
            mock_copy2.side_effect = IOError("bad copy")
            self.assertRaises(
                errors.ReverterError, self.reverter.add_to_checkpoint,
                self.sets[0], "save1")

    def test_checkpoint_conflict(self):
        """Make sure that checkpoint errors are thrown appropriately."""
        config3 = os.path.join(self.dir1, "config3.txt")
        self.reverter.register_file_creation(True, config3)
        update_file(config3, "This is a new file!")

        self.reverter.add_to_checkpoint(self.sets[2], "save1")
        # This shouldn't throw an error
        self.reverter.add_to_temp_checkpoint(self.sets[0], "save2")
        # Raise error
        self.assertRaises(errors.ReverterError, self.reverter.add_to_checkpoint,
                          self.sets[2], "save3")
        # Should not cause an error
        self.reverter.add_to_checkpoint(self.sets[1], "save4")

        # Check to make sure new files are also checked...
        self.assertRaises(errors.ReverterError, self.reverter.add_to_checkpoint,
                          set([config3]), "invalid save")

    @test_util.broken_on_windows
    def test_multiple_saves_and_temp_revert(self):
        self.reverter.add_to_temp_checkpoint(self.sets[0], "save1")
        update_file(self.config1, "updated-directive")
        self.reverter.add_to_temp_checkpoint(self.sets[0], "save2-updated dir")
        update_file(self.config1, "new directive change that we won't keep")

        self.reverter.revert_temporary_config()
        self.assertEqual(read_in(self.config1), "directive-dir1")

    def test_multiple_registration_fail_and_revert(self):

        config3 = os.path.join(self.dir1, "config3.txt")
        update_file(config3, "Config3")
        config4 = os.path.join(self.dir2, "config4.txt")
        update_file(config4, "Config4")

        # Test multiple registrations and two registrations at once
        self.reverter.register_file_creation(True, self.config1)
        self.reverter.register_file_creation(True, self.config2)
        self.reverter.register_file_creation(True, config3, config4)

        # Simulate Certbot crash... recovery routine is run
        self.reverter.recovery_routine()

        self.assertFalse(os.path.isfile(self.config1))
        self.assertFalse(os.path.isfile(self.config2))
        self.assertFalse(os.path.isfile(config3))
        self.assertFalse(os.path.isfile(config4))

    @test_util.broken_on_windows
    def test_multiple_registration_same_file(self):
        self.reverter.register_file_creation(True, self.config1)
        self.reverter.register_file_creation(True, self.config1)
        self.reverter.register_file_creation(True, self.config1)
        self.reverter.register_file_creation(True, self.config1)

        files = get_new_files(self.config.temp_checkpoint_dir)

        self.assertEqual(len(files), 1)

    def test_register_file_creation_write_error(self):
        m_open = mock.mock_open()
        with mock.patch("certbot.reverter.open", m_open, create=True):
            m_open.side_effect = OSError("bad open")
            self.assertRaises(
                errors.ReverterError, self.reverter.register_file_creation,
                True, self.config1)

    def test_bad_registration(self):
        # Made this mistake and want to make sure it doesn't happen again...
        self.assertRaises(
            errors.ReverterError, self.reverter.register_file_creation,
            "filepath")

    @test_util.broken_on_windows
    def test_register_undo_command(self):
        coms = [
            ["a2dismod", "ssl"],
            ["a2dismod", "rewrite"],
            ["cleanslate"]
        ]
        for com in coms:
            self.reverter.register_undo_command(True, com)

        act_coms = get_undo_commands(self.config.temp_checkpoint_dir)

        for a_com, com in six.moves.zip(act_coms, coms):
            self.assertEqual(a_com, com)

    def test_bad_register_undo_command(self):
        m_open = mock.mock_open()
        with mock.patch("certbot.reverter.open", m_open, create=True):
            m_open.side_effect = OSError("bad open")
            self.assertRaises(
                errors.ReverterError, self.reverter.register_undo_command,
                True, ["command"])

    @test_util.broken_on_windows
    @mock.patch("certbot.util.run_script")
    def test_run_undo_commands(self, mock_run):
        mock_run.side_effect = ["", errors.SubprocessError]
        coms = [
            ["invalid_command"],
            ["a2dismod", "ssl"],
        ]
        for com in coms:
            self.reverter.register_undo_command(True, com)

        self.reverter.revert_temporary_config()

        self.assertEqual(mock_run.call_count, 2)

    def test_recovery_routine_in_progress_failure(self):
        self.reverter.add_to_checkpoint(self.sets[0], "perm save")

        # pylint: disable=protected-access
        self.reverter._recover_checkpoint = mock.MagicMock(
            side_effect=errors.ReverterError)
        self.assertRaises(errors.ReverterError, self.reverter.recovery_routine)

    def test_recover_checkpoint_revert_temp_failures(self):

        mock_recover = mock.MagicMock(
            side_effect=errors.ReverterError("e"))

        # pylint: disable=protected-access
        self.reverter._recover_checkpoint = mock_recover

        self.reverter.add_to_temp_checkpoint(self.sets[0], "config1 save")

        self.assertRaises(
            errors.ReverterError, self.reverter.revert_temporary_config)

    def test_recover_checkpoint_rollback_failure(self):
        mock_recover = mock.MagicMock(
            side_effect=errors.ReverterError("e"))
        # pylint: disable=protected-access
        self.reverter._recover_checkpoint = mock_recover

        self.reverter.add_to_checkpoint(self.sets[0], "config1 save")
        self.reverter.finalize_checkpoint("Title")

        self.assertRaises(
            errors.ReverterError, self.reverter.rollback_checkpoints, 1)

    def test_recover_checkpoint_copy_failure(self):
        self.reverter.add_to_temp_checkpoint(self.sets[0], "save1")

        with mock.patch("certbot.reverter.shutil.copy2") as mock_copy2:
            mock_copy2.side_effect = OSError("bad copy")
            self.assertRaises(
                errors.ReverterError, self.reverter.revert_temporary_config)

    def test_recover_checkpoint_rm_failure(self):
        self.reverter.add_to_temp_checkpoint(self.sets[0], "temp save")

        with mock.patch("certbot.reverter.shutil.rmtree") as mock_rmtree:
            mock_rmtree.side_effect = OSError("Cannot remove tree")
            self.assertRaises(
                errors.ReverterError, self.reverter.revert_temporary_config)

    @test_util.broken_on_windows
    @mock.patch("certbot.reverter.logger.warning")
    def test_recover_checkpoint_missing_new_files(self, mock_warn):
        self.reverter.register_file_creation(
            True, os.path.join(self.dir1, "missing_file.txt"))
        self.reverter.revert_temporary_config()
        self.assertEqual(mock_warn.call_count, 1)

    @mock.patch("certbot.reverter.os.remove")
    def test_recover_checkpoint_remove_failure(self, mock_remove):
        self.reverter.register_file_creation(True, self.config1)
        mock_remove.side_effect = OSError("Can't remove")
        self.assertRaises(
            errors.ReverterError, self.reverter.revert_temporary_config)

    @test_util.broken_on_windows
    def test_recovery_routine_temp_and_perm(self):
        # Register a new perm checkpoint file
        config3 = os.path.join(self.dir1, "config3.txt")
        self.reverter.register_file_creation(False, config3)
        update_file(config3, "This is a new perm file!")

        # Add changes to perm checkpoint
        self.reverter.add_to_checkpoint(self.sets[0], "perm save1")
        update_file(self.config1, "updated perm config1")
        self.reverter.add_to_checkpoint(self.sets[1], "perm save2")
        update_file(self.config2, "updated perm config2")

        # Add changes to a temporary checkpoint
        self.reverter.add_to_temp_checkpoint(self.sets[0], "temp save1")
        update_file(self.config1, "second update now temp config1")

        # Register a new temp checkpoint file
        config4 = os.path.join(self.dir2, "config4.txt")
        self.reverter.register_file_creation(True, config4)
        update_file(config4, "New temporary file!")

        # Now erase everything
        self.reverter.recovery_routine()

        # Now Run tests
        # These were new files.. they should be removed
        self.assertFalse(os.path.isfile(config3))
        self.assertFalse(os.path.isfile(config4))

        # Check to make sure everything got rolled back appropriately
        self.assertEqual(read_in(self.config1), "directive-dir1")
        self.assertEqual(read_in(self.config2), "directive-dir2")


class TestFullCheckpointsReverter(test_util.ConfigTestCase):
    # pylint: disable=too-many-instance-attributes
    """Tests functions having to deal with full checkpoints."""
    def setUp(self):
        super(TestFullCheckpointsReverter, self).setUp()
        from certbot.reverter import Reverter
        # Disable spurious errors...
        logging.disable(logging.CRITICAL)

        self.reverter = Reverter(self.config)

        tup = setup_test_files()
        self.config1, self.config2, self.dir1, self.dir2, self.sets = tup

    def tearDown(self):
        shutil.rmtree(self.config.work_dir)
        shutil.rmtree(self.dir1)
        shutil.rmtree(self.dir2)

        logging.disable(logging.NOTSET)

    def test_rollback_improper_inputs(self):
        self.assertRaises(
            errors.ReverterError, self.reverter.rollback_checkpoints, "-1")
        self.assertRaises(
            errors.ReverterError, self.reverter.rollback_checkpoints, -1000)
        self.assertRaises(
            errors.ReverterError, self.reverter.rollback_checkpoints, "one")

    @test_util.broken_on_windows
    def test_rollback_finalize_checkpoint_valid_inputs(self):

        config3 = self._setup_three_checkpoints()

        # Check resulting backup directory
        self.assertEqual(len(os.listdir(self.config.backup_dir)), 3)
        # Check rollbacks
        # First rollback
        self.reverter.rollback_checkpoints(1)
        self.assertEqual(read_in(self.config1), "update config1")
        self.assertEqual(read_in(self.config2), "update config2")
        # config3 was not included in checkpoint
        self.assertEqual(read_in(config3), "Final form config3")

        # Second rollback
        self.reverter.rollback_checkpoints(1)
        self.assertEqual(read_in(self.config1), "update config1")
        self.assertEqual(read_in(self.config2), "directive-dir2")
        self.assertFalse(os.path.isfile(config3))

        # One dir left... check title
        all_dirs = os.listdir(self.config.backup_dir)
        self.assertEqual(len(all_dirs), 1)
        self.assertTrue(
            "First Checkpoint" in get_save_notes(
                os.path.join(self.config.backup_dir, all_dirs[0])))
        # Final rollback
        self.reverter.rollback_checkpoints(1)
        self.assertEqual(read_in(self.config1), "directive-dir1")

    def test_finalize_checkpoint_no_in_progress(self):
        # No need to warn for this... just make sure there are no errors.
        self.reverter.finalize_checkpoint("No checkpoint...")

    @mock.patch("certbot.reverter.shutil.move")
    def test_finalize_checkpoint_cannot_title(self, mock_move):
        self.reverter.add_to_checkpoint(self.sets[0], "perm save")
        mock_move.side_effect = OSError("cannot move")

        self.assertRaises(
            errors.ReverterError, self.reverter.finalize_checkpoint, "Title")

    @mock.patch("certbot.reverter.compat.os_rename")
    def test_finalize_checkpoint_no_rename_directory(self, mock_rename):

        self.reverter.add_to_checkpoint(self.sets[0], "perm save")
        mock_rename.side_effect = OSError

        self.assertRaises(
            errors.ReverterError, self.reverter.finalize_checkpoint, "Title")

    @test_util.broken_on_windows
    @mock.patch("certbot.reverter.logger")
    def test_rollback_too_many(self, mock_logger):
        # Test no exist warning...
        self.reverter.rollback_checkpoints(1)
        self.assertEqual(mock_logger.warning.call_count, 1)

        # Test Generic warning
        self._setup_three_checkpoints()
        mock_logger.warning.call_count = 0
        self.reverter.rollback_checkpoints(4)
        self.assertEqual(mock_logger.warning.call_count, 1)

    @test_util.broken_on_windows
    def test_multi_rollback(self):
        config3 = self._setup_three_checkpoints()
        self.reverter.rollback_checkpoints(3)

        self.assertEqual(read_in(self.config1), "directive-dir1")
        self.assertEqual(read_in(self.config2), "directive-dir2")
        self.assertFalse(os.path.isfile(config3))

    @test_util.patch_get_utility()
    def test_view_config_changes(self, mock_output):
        """This is not strict as this is subject to change."""
        self._setup_three_checkpoints()

        # Make sure it doesn't throw any errors
        self.reverter.view_config_changes()

        # Make sure notification is output
        self.assertEqual(mock_output().notification.call_count, 1)

    @mock.patch("certbot.reverter.logger")
    def test_view_config_changes_no_backups(self, mock_logger):
        self.reverter.view_config_changes()
        self.assertTrue(mock_logger.info.call_count > 0)

    def test_view_config_changes_bad_backups_dir(self):
        # There shouldn't be any "in progress directories when this is called
        # It must just be clean checkpoints
        os.makedirs(os.path.join(self.config.backup_dir, "in_progress"))

        self.assertRaises(
            errors.ReverterError, self.reverter.view_config_changes)

    def test_view_config_changes_for_logging(self):
        self._setup_three_checkpoints()

        config_changes = self.reverter.view_config_changes(for_logging=True)

        self.assertTrue("First Checkpoint" in config_changes)
        self.assertTrue("Second Checkpoint" in config_changes)
        self.assertTrue("Third Checkpoint" in config_changes)

    def _setup_three_checkpoints(self):
        """Generate some finalized checkpoints."""
        # Checkpoint1 - config1
        self.reverter.add_to_checkpoint(self.sets[0], "first save")
        self.reverter.finalize_checkpoint("First Checkpoint")

        update_file(self.config1, "update config1")

        # Checkpoint2 - new file config3, update config2
        config3 = os.path.join(self.dir1, "config3.txt")
        self.reverter.register_file_creation(False, config3)
        update_file(config3, "directive-config3")
        self.reverter.add_to_checkpoint(self.sets[1], "second save")
        self.reverter.finalize_checkpoint("Second Checkpoint")

        update_file(self.config2, "update config2")
        update_file(config3, "update config3")

        # Checkpoint3 - update config1, config2
        self.reverter.add_to_checkpoint(self.sets[2], "third save")
        self.reverter.finalize_checkpoint("Third Checkpoint - Save both")

        update_file(self.config1, "Final form config1")
        update_file(self.config2, "Final form config2")
        update_file(config3, "Final form config3")

        return config3


def setup_test_files():
    """Setup sample configuration files."""
    dir1 = tempfile.mkdtemp("dir1")
    dir2 = tempfile.mkdtemp("dir2")
    config1 = os.path.join(dir1, "config.txt")
    config2 = os.path.join(dir2, "config.txt")
    with open(config1, "w") as file_fd:
        file_fd.write("directive-dir1")
    with open(config2, "w") as file_fd:
        file_fd.write("directive-dir2")

    sets = [set([config1]),
            set([config2]),
            set([config1, config2])]

    return config1, config2, dir1, dir2, sets


def get_save_notes(dire):
    """Read save notes"""
    return read_in(os.path.join(dire, "CHANGES_SINCE"))


def get_filepaths(dire):
    """Get Filepaths"""
    return read_in(os.path.join(dire, "FILEPATHS"))


def get_new_files(dire):
    """Get new files."""
    return read_in(os.path.join(dire, "NEW_FILES")).splitlines()


def get_undo_commands(dire):
    """Get new files."""
    with open(os.path.join(dire, "COMMANDS")) as csvfile:
        return list(csv.reader(csvfile))


def read_in(path):
    """Read in a file, return the str"""
    with open(path, "r") as file_fd:
        return file_fd.read()


def update_file(filename, string):
    """Update a file with a new value."""
    with open(filename, "w") as file_fd:
        file_fd.write(string)


if __name__ == "__main__":
    unittest.main()  # pragma: no cover

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
display Folder 0755
testdata Folder 0755
__init__.py File 20 B 0644
account_test.py File 14.45 KB 0644
acme_util.py File 3.18 KB 0644
auth_handler_test.py File 24 KB 0644
cert_manager_test.py File 28.07 KB 0644
cli_test.py File 19.94 KB 0644
client_test.py File 28.76 KB 0644
compat_test.py File 736 B 0644
configuration_test.py File 6.82 KB 0644
crypto_util_test.py File 13.56 KB 0644
eff_test.py File 5.94 KB 0644
error_handler_test.py File 5.31 KB 0644
errors_test.py File 1.8 KB 0644
hook_test.py File 16.67 KB 0644
lock_test.py File 3.84 KB 0644
log_test.py File 14.95 KB 0644
main_test.py File 82.52 KB 0644
notify_test.py File 2.07 KB 0644
ocsp_test.py File 6.27 KB 0644
renewal_test.py File 4.18 KB 0644
renewupdater_test.py File 5.32 KB 0644
reporter_test.py File 2.73 KB 0644
reverter_test.py File 18.7 KB 0644
storage_test.py File 42.89 KB 0644
util.py File 14.12 KB 0644
util_test.py File 21.58 KB 0644