404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.15.26.71: ~ $
"""Tests for certbot.lock."""
import functools
import multiprocessing
import os
import unittest

import mock

from certbot import errors
from certbot.tests import util as test_util


@test_util.broken_on_windows
class LockDirTest(test_util.TempDirTestCase):
    """Tests for certbot.lock.lock_dir."""
    @classmethod
    def _call(cls, *args, **kwargs):
        from certbot.lock import lock_dir
        return lock_dir(*args, **kwargs)

    def test_it(self):
        assert_raises = functools.partial(
            self.assertRaises, errors.LockError, self._call, self.tempdir)
        lock_path = os.path.join(self.tempdir, '.certbot.lock')
        test_util.lock_and_call(assert_raises, lock_path)


@test_util.broken_on_windows
class LockFileTest(test_util.TempDirTestCase):
    """Tests for certbot.lock.LockFile."""
    @classmethod
    def _call(cls, *args, **kwargs):
        from certbot.lock import LockFile
        return LockFile(*args, **kwargs)

    def setUp(self):
        super(LockFileTest, self).setUp()
        self.lock_path = os.path.join(self.tempdir, 'test.lock')

    def test_acquire_without_deletion(self):
        # acquire the lock in another process but don't delete the file
        child = multiprocessing.Process(target=self._call,
                                        args=(self.lock_path,))
        child.start()
        child.join()
        self.assertEqual(child.exitcode, 0)
        self.assertTrue(os.path.exists(self.lock_path))

        # Test we're still able to properly acquire and release the lock
        self.test_removed()

    def test_contention(self):
        assert_raises = functools.partial(
            self.assertRaises, errors.LockError, self._call, self.lock_path)
        test_util.lock_and_call(assert_raises, self.lock_path)

    def test_locked_repr(self):
        lock_file = self._call(self.lock_path)
        locked_repr = repr(lock_file)
        self._test_repr_common(lock_file, locked_repr)
        self.assertTrue('acquired' in locked_repr)

    def test_released_repr(self):
        lock_file = self._call(self.lock_path)
        lock_file.release()
        released_repr = repr(lock_file)
        self._test_repr_common(lock_file, released_repr)
        self.assertTrue('released' in released_repr)

    def _test_repr_common(self, lock_file, lock_repr):
        self.assertTrue(lock_file.__class__.__name__ in lock_repr)
        self.assertTrue(self.lock_path in lock_repr)

    def test_race(self):
        should_delete = [True, False]
        stat = os.stat

        def delete_and_stat(path):
            """Wrap os.stat and maybe delete the file first."""
            if path == self.lock_path and should_delete.pop(0):
                os.remove(path)
            return stat(path)

        with mock.patch('certbot.lock.os.stat') as mock_stat:
            mock_stat.side_effect = delete_and_stat
            self._call(self.lock_path)
        self.assertFalse(should_delete)

    def test_removed(self):
        lock_file = self._call(self.lock_path)
        lock_file.release()
        self.assertFalse(os.path.exists(self.lock_path))

    @mock.patch('certbot.compat.fcntl.lockf')
    def test_unexpected_lockf_err(self, mock_lockf):
        msg = 'hi there'
        mock_lockf.side_effect = IOError(msg)
        try:
            self._call(self.lock_path)
        except IOError as err:
            self.assertTrue(msg in str(err))
        else:  # pragma: no cover
            self.fail('IOError not raised')

    @mock.patch('certbot.lock.os.stat')
    def test_unexpected_stat_err(self, mock_stat):
        msg = 'hi there'
        mock_stat.side_effect = OSError(msg)
        try:
            self._call(self.lock_path)
        except OSError as err:
            self.assertTrue(msg in str(err))
        else:  # pragma: no cover
            self.fail('OSError not raised')


if __name__ == "__main__":
    unittest.main()  # pragma: no cover

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
display Folder 0755
testdata Folder 0755
__init__.py File 20 B 0644
account_test.py File 14.45 KB 0644
acme_util.py File 3.18 KB 0644
auth_handler_test.py File 24 KB 0644
cert_manager_test.py File 28.07 KB 0644
cli_test.py File 19.94 KB 0644
client_test.py File 28.76 KB 0644
compat_test.py File 736 B 0644
configuration_test.py File 6.82 KB 0644
crypto_util_test.py File 13.56 KB 0644
eff_test.py File 5.94 KB 0644
error_handler_test.py File 5.31 KB 0644
errors_test.py File 1.8 KB 0644
hook_test.py File 16.67 KB 0644
lock_test.py File 3.84 KB 0644
log_test.py File 14.95 KB 0644
main_test.py File 82.52 KB 0644
notify_test.py File 2.07 KB 0644
ocsp_test.py File 6.27 KB 0644
renewal_test.py File 4.18 KB 0644
renewupdater_test.py File 5.32 KB 0644
reporter_test.py File 2.73 KB 0644
reverter_test.py File 18.7 KB 0644
storage_test.py File 42.89 KB 0644
util.py File 14.12 KB 0644
util_test.py File 21.58 KB 0644