"""Tests for certbot.client."""
import os
import platform
import shutil
import tempfile
import unittest
import mock
from certbot import account
from certbot import errors
from certbot import util
import certbot.tests.util as test_util
from josepy import interfaces
KEY = test_util.load_vector("rsa512_key.pem")
CSR_SAN = test_util.load_vector("csr-san_512.pem")
class DetermineUserAgentTest(test_util.ConfigTestCase):
"""Tests for certbot.client.determine_user_agent."""
def _call(self):
from certbot.client import determine_user_agent
return determine_user_agent(self.config)
@mock.patch.dict(os.environ, {"CERTBOT_DOCS": "1"})
def test_docs_value(self):
self._test(expect_doc_values=True)
@mock.patch.dict(os.environ, {})
def test_real_values(self):
self._test(expect_doc_values=False)
def _test(self, expect_doc_values):
ua = self._call()
if expect_doc_values:
doc_value_check = self.assertIn
real_value_check = self.assertNotIn
else:
doc_value_check = self.assertNotIn
real_value_check = self.assertIn
doc_value_check("certbot(-auto)", ua)
doc_value_check("OS_NAME OS_VERSION", ua)
doc_value_check("major.minor.patchlevel", ua)
real_value_check(util.get_os_info_ua(), ua)
real_value_check(platform.python_version(), ua)
class RegisterTest(test_util.ConfigTestCase):
"""Tests for certbot.client.register."""
def setUp(self):
super(RegisterTest, self).setUp()
self.config.rsa_key_size = 1024
self.config.register_unsafely_without_email = False
self.config.email = "alias@example.com"
self.account_storage = account.AccountMemoryStorage()
def _call(self):
from certbot.client import register
tos_cb = mock.MagicMock()
return register(self.config, self.account_storage, tos_cb)
@staticmethod
def _public_key_mock():
m = mock.Mock(__class__=interfaces.JSONDeSerializable)
m.to_partial_json.return_value = '{"a": 1}'
return m
@staticmethod
def _new_acct_dir_mock():
return "/acme/new-account"
@staticmethod
def _true_mock():
return True
@staticmethod
def _false_mock():
return False
def test_no_tos(self):
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client.new_account_and_tos().terms_of_service = "http://tos"
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot.eff.handle_subscription") as mock_handle:
with mock.patch("certbot.account.report_new_account"):
mock_client().new_account_and_tos.side_effect = errors.Error
self.assertRaises(errors.Error, self._call)
self.assertFalse(mock_handle.called)
mock_client().new_account_and_tos.side_effect = None
self._call()
self.assertTrue(mock_handle.called)
def test_it(self):
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot.account.report_new_account"):
with mock.patch("certbot.eff.handle_subscription"):
self._call()
@mock.patch("certbot.account.report_new_account")
@mock.patch("certbot.client.display_ops.get_email")
def test_email_retry(self, _rep, mock_get_email):
from acme import messages
self.config.noninteractive_mode = False
msg = "DNS problem: NXDOMAIN looking up MX for example.com"
mx_err = messages.Error.with_code('invalidContact', detail=msg)
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot.eff.handle_subscription") as mock_handle:
mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()]
self._call()
self.assertEqual(mock_get_email.call_count, 1)
self.assertTrue(mock_handle.called)
@mock.patch("certbot.account.report_new_account")
def test_email_invalid_noninteractive(self, _rep):
from acme import messages
self.config.noninteractive_mode = True
msg = "DNS problem: NXDOMAIN looking up MX for example.com"
mx_err = messages.Error.with_code('invalidContact', detail=msg)
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot.eff.handle_subscription"):
mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()]
self.assertRaises(errors.Error, self._call)
def test_needs_email(self):
self.config.email = None
self.assertRaises(errors.Error, self._call)
@mock.patch("certbot.client.logger")
def test_without_email(self, mock_logger):
with mock.patch("certbot.eff.handle_subscription") as mock_handle:
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_clnt:
mock_clnt().external_account_required.side_effect = self._false_mock
with mock.patch("certbot.account.report_new_account"):
self.config.email = None
self.config.register_unsafely_without_email = True
self.config.dry_run = False
self._call()
mock_logger.info.assert_called_once_with(mock.ANY)
self.assertTrue(mock_handle.called)
@mock.patch("certbot.account.report_new_account")
@mock.patch("certbot.client.display_ops.get_email")
def test_dry_run_no_staging_account(self, _rep, mock_get_email):
"""Tests dry-run for no staging account, expect account created with no email"""
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot.eff.handle_subscription"):
with mock.patch("certbot.account.report_new_account"):
self.config.dry_run = True
self._call()
# check Certbot did not ask the user to provide an email
self.assertFalse(mock_get_email.called)
# check Certbot created an account with no email. Contact should return empty
self.assertFalse(mock_client().new_account_and_tos.call_args[0][0].contact)
def test_with_eab_arguments(self):
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client().client.directory.__getitem__ = mock.Mock(
side_effect=self._new_acct_dir_mock
)
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot.eff.handle_subscription"):
target = "certbot.client.messages.ExternalAccountBinding.from_data"
with mock.patch(target) as mock_eab_from_data:
self.config.eab_kid = "test-kid"
self.config.eab_hmac_key = "J2OAqW4MHXsrHVa_PVg0Y-L_R4SYw0_aL1le6mfblbE"
self._call()
self.assertTrue(mock_eab_from_data.called)
def test_without_eab_arguments(self):
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot.eff.handle_subscription"):
target = "certbot.client.messages.ExternalAccountBinding.from_data"
with mock.patch(target) as mock_eab_from_data:
self.config.eab_kid = None
self.config.eab_hmac_key = None
self._call()
self.assertFalse(mock_eab_from_data.called)
def test_external_account_required_without_eab_arguments(self):
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client().client.net.key.public_key = mock.Mock(side_effect=self._public_key_mock)
mock_client().external_account_required.side_effect = self._true_mock
with mock.patch("certbot.eff.handle_subscription"):
with mock.patch("certbot.client.messages.ExternalAccountBinding.from_data"):
self.config.eab_kid = None
self.config.eab_hmac_key = None
self.assertRaises(errors.Error, self._call)
def test_unsupported_error(self):
from acme import messages
msg = "Test"
mx_err = messages.Error(detail=msg, typ="malformed", title="title")
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
mock_client().client.directory.__getitem__ = mock.Mock(
side_effect=self._new_acct_dir_mock
)
mock_client().external_account_required.side_effect = self._false_mock
with mock.patch("certbot.eff.handle_subscription") as mock_handle:
mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()]
self.assertRaises(messages.Error, self._call)
self.assertFalse(mock_handle.called)
class ClientTestCommon(test_util.ConfigTestCase):
"""Common base class for certbot.client.Client tests."""
def setUp(self):
super(ClientTestCommon, self).setUp()
self.config.no_verify_ssl = False
self.config.allow_subset_of_names = False
# pylint: disable=star-args
self.account = mock.MagicMock(**{"key.pem": KEY})
from certbot.client import Client
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as acme:
self.acme_client = acme
self.acme = acme.return_value = mock.MagicMock()
self.client = Client(
config=self.config, account_=self.account,
auth=None, installer=None)
class ClientTest(ClientTestCommon):
"""Tests for certbot.client.Client."""
def setUp(self):
super(ClientTest, self).setUp()
self.config.allow_subset_of_names = False
self.config.dry_run = False
self.eg_domains = ["example.com", "www.example.com"]
self.eg_order = mock.MagicMock(
authorizations=[None],
csr_pem=mock.sentinel.csr_pem)
def test_init_acme_verify_ssl(self):
net = self.acme_client.call_args[0][0]
self.assertTrue(net.verify_ssl)
def _mock_obtain_certificate(self):
self.client.auth_handler = mock.MagicMock()
self.client.auth_handler.handle_authorizations.return_value = [None]
self.acme.finalize_order.return_value = self.eg_order
self.acme.new_order.return_value = self.eg_order
self.eg_order.update.return_value = self.eg_order
def _check_obtain_certificate(self, auth_count=1):
if auth_count == 1:
self.client.auth_handler.handle_authorizations.assert_called_once_with(
self.eg_order,
self.config.allow_subset_of_names)
else:
self.assertEqual(self.client.auth_handler.handle_authorizations.call_count, auth_count)
self.acme.finalize_order.assert_called_once_with(
self.eg_order, mock.ANY)
@mock.patch("certbot.client.crypto_util")
@mock.patch("certbot.client.logger")
@test_util.patch_get_utility()
def test_obtain_certificate_from_csr(self, unused_mock_get_utility,
mock_logger, mock_crypto_util):
self._mock_obtain_certificate()
test_csr = util.CSR(form="pem", file=None, data=CSR_SAN)
auth_handler = self.client.auth_handler
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
orderr = self.acme.new_order(test_csr.data)
auth_handler.handle_authorizations(orderr, False)
self.assertEqual(
(mock.sentinel.cert, mock.sentinel.chain),
self.client.obtain_certificate_from_csr(
test_csr,
orderr=orderr))
# and that the cert was obtained correctly
self._check_obtain_certificate()
# Test for orderr=None
self.assertEqual(
(mock.sentinel.cert, mock.sentinel.chain),
self.client.obtain_certificate_from_csr(
test_csr,
orderr=None))
auth_handler.handle_authorizations.assert_called_with(self.eg_order, False)
# Test for no auth_handler
self.client.auth_handler = None
self.assertRaises(
errors.Error,
self.client.obtain_certificate_from_csr,
test_csr)
mock_logger.warning.assert_called_once_with(mock.ANY)
@mock.patch("certbot.client.crypto_util")
def test_obtain_certificate(self, mock_crypto_util):
csr = util.CSR(form="pem", file=None, data=CSR_SAN)
mock_crypto_util.init_save_csr.return_value = csr
mock_crypto_util.init_save_key.return_value = mock.sentinel.key
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
self._test_obtain_certificate_common(mock.sentinel.key, csr)
mock_crypto_util.init_save_key.assert_called_once_with(
self.config.rsa_key_size, self.config.key_dir)
mock_crypto_util.init_save_csr.assert_called_once_with(
mock.sentinel.key, self.eg_domains, self.config.csr_dir)
mock_crypto_util.cert_and_chain_from_fullchain.assert_called_once_with(
self.eg_order.fullchain_pem)
@mock.patch("certbot.client.crypto_util")
@mock.patch("os.remove")
def test_obtain_certificate_partial_success(self, mock_remove, mock_crypto_util):
csr = util.CSR(form="pem", file=mock.sentinel.csr_file, data=CSR_SAN)
key = util.CSR(form="pem", file=mock.sentinel.key_file, data=CSR_SAN)
mock_crypto_util.init_save_csr.return_value = csr
mock_crypto_util.init_save_key.return_value = key
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
authzr = self._authzr_from_domains(["example.com"])
self.config.allow_subset_of_names = True
self._test_obtain_certificate_common(key, csr, authzr_ret=authzr, auth_count=2)
self.assertEqual(mock_crypto_util.init_save_key.call_count, 2)
self.assertEqual(mock_crypto_util.init_save_csr.call_count, 2)
self.assertEqual(mock_remove.call_count, 2)
self.assertEqual(mock_crypto_util.cert_and_chain_from_fullchain.call_count, 1)
@mock.patch("certbot.client.crypto_util")
@mock.patch("certbot.client.acme_crypto_util")
def test_obtain_certificate_dry_run(self, mock_acme_crypto, mock_crypto):
csr = util.CSR(form="pem", file=None, data=CSR_SAN)
mock_acme_crypto.make_csr.return_value = CSR_SAN
mock_crypto.make_key.return_value = mock.sentinel.key_pem
key = util.Key(file=None, pem=mock.sentinel.key_pem)
self._set_mock_from_fullchain(mock_crypto.cert_and_chain_from_fullchain)
self.client.config.dry_run = True
self._test_obtain_certificate_common(key, csr)
mock_crypto.make_key.assert_called_once_with(self.config.rsa_key_size)
mock_acme_crypto.make_csr.assert_called_once_with(
mock.sentinel.key_pem, self.eg_domains, self.config.must_staple)
mock_crypto.init_save_key.assert_not_called()
mock_crypto.init_save_csr.assert_not_called()
self.assertEqual(mock_crypto.cert_and_chain_from_fullchain.call_count, 1)
def _set_mock_from_fullchain(self, mock_from_fullchain):
mock_cert = mock.Mock()
mock_cert.encode.return_value = mock.sentinel.cert
mock_chain = mock.Mock()
mock_chain.encode.return_value = mock.sentinel.chain
mock_from_fullchain.return_value = (mock_cert, mock_chain)
def _authzr_from_domains(self, domains):
authzr = []
# domain ordering should not be affected by authorization order
for domain in reversed(domains):
authzr.append(
mock.MagicMock(
body=mock.MagicMock(
identifier=mock.MagicMock(
value=domain))))
return authzr
def _test_obtain_certificate_common(self, key, csr, authzr_ret=None, auth_count=1):
self._mock_obtain_certificate()
# return_value is essentially set to (None, None) in
# _mock_obtain_certificate(), which breaks this test.
# Thus fixed by the next line.
authzr = authzr_ret or self._authzr_from_domains(self.eg_domains)
self.eg_order.authorizations = authzr
self.client.auth_handler.handle_authorizations.return_value = authzr
with test_util.patch_get_utility():
result = self.client.obtain_certificate(self.eg_domains)
self.assertEqual(
result,
(mock.sentinel.cert, mock.sentinel.chain, key, csr))
self._check_obtain_certificate(auth_count)
@mock.patch('certbot.client.Client.obtain_certificate')
@mock.patch('certbot.storage.RenewableCert.new_lineage')
def test_obtain_and_enroll_certificate(self,
mock_storage, mock_obtain_certificate):
domains = ["*.example.com", "example.com"]
mock_obtain_certificate.return_value = (mock.MagicMock(),
mock.MagicMock(), mock.MagicMock(), None)
self.client.config.dry_run = False
self.assertTrue(self.client.obtain_and_enroll_certificate(domains, "example_cert"))
self.assertTrue(self.client.obtain_and_enroll_certificate(domains, None))
self.assertTrue(self.client.obtain_and_enroll_certificate(domains[1:], None))
self.client.config.dry_run = True
self.assertFalse(self.client.obtain_and_enroll_certificate(domains, None))
names = [call[0][0] for call in mock_storage.call_args_list]
self.assertEqual(names, ["example_cert", "example.com", "example.com"])
@mock.patch("certbot.cli.helpful_parser")
def test_save_certificate(self, mock_parser):
# pylint: disable=too-many-locals
certs = ["cert_512.pem", "cert-san_512.pem"]
tmp_path = tempfile.mkdtemp()
os.chmod(tmp_path, 0o755) # TODO: really??
cert_pem = test_util.load_vector(certs[0])
chain_pem = (test_util.load_vector(certs[0]) + test_util.load_vector(certs[1]))
candidate_cert_path = os.path.join(tmp_path, "certs", "cert_512.pem")
candidate_chain_path = os.path.join(tmp_path, "chains", "chain.pem")
candidate_fullchain_path = os.path.join(tmp_path, "chains", "fullchain.pem")
mock_parser.verb = "certonly"
mock_parser.args = ["--cert-path", candidate_cert_path,
"--chain-path", candidate_chain_path,
"--fullchain-path", candidate_fullchain_path]
cert_path, chain_path, fullchain_path = self.client.save_certificate(
cert_pem, chain_pem, candidate_cert_path, candidate_chain_path,
candidate_fullchain_path)
self.assertEqual(os.path.dirname(cert_path),
os.path.dirname(candidate_cert_path))
self.assertEqual(os.path.dirname(chain_path),
os.path.dirname(candidate_chain_path))
self.assertEqual(os.path.dirname(fullchain_path),
os.path.dirname(candidate_fullchain_path))
with open(cert_path, "rb") as cert_file:
cert_contents = cert_file.read()
self.assertEqual(cert_contents, test_util.load_vector(certs[0]))
with open(chain_path, "rb") as chain_file:
chain_contents = chain_file.read()
self.assertEqual(chain_contents, test_util.load_vector(certs[0]) +
test_util.load_vector(certs[1]))
shutil.rmtree(tmp_path)
def test_deploy_certificate_success(self):
self.assertRaises(errors.Error, self.client.deploy_certificate,
["foo.bar"], "key", "cert", "chain", "fullchain")
installer = mock.MagicMock()
self.client.installer = installer
self.client.deploy_certificate(
["foo.bar"], "key", "cert", "chain", "fullchain")
installer.deploy_cert.assert_called_once_with(
cert_path=os.path.abspath("cert"),
chain_path=os.path.abspath("chain"),
domain='foo.bar',
fullchain_path='fullchain',
key_path=os.path.abspath("key"))
self.assertEqual(installer.save.call_count, 2)
installer.restart.assert_called_once_with()
def test_deploy_certificate_failure(self):
installer = mock.MagicMock()
self.client.installer = installer
installer.deploy_cert.side_effect = errors.PluginError
self.assertRaises(errors.PluginError, self.client.deploy_certificate,
["foo.bar"], "key", "cert", "chain", "fullchain")
installer.recovery_routine.assert_called_once_with()
def test_deploy_certificate_save_failure(self):
installer = mock.MagicMock()
self.client.installer = installer
installer.save.side_effect = errors.PluginError
self.assertRaises(errors.PluginError, self.client.deploy_certificate,
["foo.bar"], "key", "cert", "chain", "fullchain")
installer.recovery_routine.assert_called_once_with()
@test_util.patch_get_utility()
def test_deploy_certificate_restart_failure(self, mock_get_utility):
installer = mock.MagicMock()
installer.restart.side_effect = [errors.PluginError, None]
self.client.installer = installer
self.assertRaises(errors.PluginError, self.client.deploy_certificate,
["foo.bar"], "key", "cert", "chain", "fullchain")
self.assertEqual(mock_get_utility().add_message.call_count, 1)
installer.rollback_checkpoints.assert_called_once_with()
self.assertEqual(installer.restart.call_count, 2)
@test_util.patch_get_utility()
def test_deploy_certificate_restart_failure2(self, mock_get_utility):
installer = mock.MagicMock()
installer.restart.side_effect = errors.PluginError
installer.rollback_checkpoints.side_effect = errors.ReverterError
self.client.installer = installer
self.assertRaises(errors.PluginError, self.client.deploy_certificate,
["foo.bar"], "key", "cert", "chain", "fullchain")
self.assertEqual(mock_get_utility().add_message.call_count, 1)
installer.rollback_checkpoints.assert_called_once_with()
self.assertEqual(installer.restart.call_count, 1)
class EnhanceConfigTest(ClientTestCommon):
"""Tests for certbot.client.Client.enhance_config."""
def setUp(self):
super(EnhanceConfigTest, self).setUp()
self.config.hsts = False
self.config.redirect = False
self.config.staple = False
self.config.uir = False
self.domain = "example.org"
def test_no_installer(self):
self.assertRaises(
errors.Error, self.client.enhance_config, [self.domain], None)
@mock.patch("certbot.client.enhancements")
def test_unsupported(self, mock_enhancements):
self.client.installer = mock.MagicMock()
self.client.installer.supported_enhancements.return_value = []
self.config.redirect = None
self.config.hsts = True
with mock.patch("certbot.client.logger") as mock_logger:
self.client.enhance_config([self.domain], None)
self.assertEqual(mock_logger.warning.call_count, 1)
self.client.installer.enhance.assert_not_called()
mock_enhancements.ask.assert_not_called()
@mock.patch("certbot.client.logger")
def test_already_exists_header(self, mock_log):
self.config.hsts = True
self._test_with_already_existing()
self.assertTrue(mock_log.warning.called)
self.assertEqual(mock_log.warning.call_args[0][1],
'Strict-Transport-Security')
@mock.patch("certbot.client.logger")
def test_already_exists_redirect(self, mock_log):
self.config.redirect = True
self._test_with_already_existing()
self.assertTrue(mock_log.warning.called)
self.assertEqual(mock_log.warning.call_args[0][1],
'redirect')
def test_no_ask_hsts(self):
self.config.hsts = True
self._test_with_all_supported()
self.client.installer.enhance.assert_called_with(
self.domain, "ensure-http-header", "Strict-Transport-Security")
def test_no_ask_redirect(self):
self.config.redirect = True
self._test_with_all_supported()
self.client.installer.enhance.assert_called_with(
self.domain, "redirect", None)
def test_no_ask_staple(self):
self.config.staple = True
self._test_with_all_supported()
self.client.installer.enhance.assert_called_with(
self.domain, "staple-ocsp", None)
def test_no_ask_uir(self):
self.config.uir = True
self._test_with_all_supported()
self.client.installer.enhance.assert_called_with(
self.domain, "ensure-http-header", "Upgrade-Insecure-Requests")
def test_enhance_failure(self):
self.client.installer = mock.MagicMock()
self.client.installer.enhance.side_effect = errors.PluginError
self._test_error()
self.client.installer.recovery_routine.assert_called_once_with()
def test_save_failure(self):
self.client.installer = mock.MagicMock()
self.client.installer.save.side_effect = errors.PluginError
self._test_error()
self.client.installer.recovery_routine.assert_called_once_with()
self.client.installer.save.assert_called_once_with(mock.ANY)
def test_restart_failure(self):
self.client.installer = mock.MagicMock()
self.client.installer.restart.side_effect = [errors.PluginError, None]
self._test_error_with_rollback()
def test_restart_failure2(self):
installer = mock.MagicMock()
installer.restart.side_effect = errors.PluginError
installer.rollback_checkpoints.side_effect = errors.ReverterError
self.client.installer = installer
self._test_error_with_rollback()
@mock.patch("certbot.client.enhancements.ask")
def test_ask(self, mock_ask):
self.config.redirect = None
mock_ask.return_value = True
self._test_with_all_supported()
def _test_error_with_rollback(self):
self._test_error()
self.assertTrue(self.client.installer.restart.called)
def _test_error(self):
self.config.redirect = True
with test_util.patch_get_utility() as mock_gu:
self.assertRaises(
errors.PluginError, self._test_with_all_supported)
self.assertEqual(mock_gu().add_message.call_count, 1)
def _test_with_all_supported(self):
if self.client.installer is None:
self.client.installer = mock.MagicMock()
self.client.installer.supported_enhancements.return_value = [
"ensure-http-header", "redirect", "staple-ocsp"]
self.client.enhance_config([self.domain], None)
self.assertEqual(self.client.installer.save.call_count, 1)
self.assertEqual(self.client.installer.restart.call_count, 1)
def _test_with_already_existing(self):
self.client.installer = mock.MagicMock()
self.client.installer.supported_enhancements.return_value = [
"ensure-http-header", "redirect", "staple-ocsp"]
self.client.installer.enhance.side_effect = errors.PluginEnhancementAlreadyPresent()
self.client.enhance_config([self.domain], None)
class RollbackTest(unittest.TestCase):
"""Tests for certbot.client.rollback."""
def setUp(self):
self.m_install = mock.MagicMock()
@classmethod
def _call(cls, checkpoints, side_effect):
from certbot.client import rollback
with mock.patch("certbot.client.plugin_selection.pick_installer") as mpi:
mpi.side_effect = side_effect
rollback(None, checkpoints, {}, mock.MagicMock())
def test_no_problems(self):
self._call(1, self.m_install)
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 1)
self.assertEqual(self.m_install().restart.call_count, 1)
def test_no_installer(self):
self._call(1, None) # Just make sure no exceptions are raised
if __name__ == "__main__":
unittest.main() # pragma: no cover