From 4aae0b9db3da51f0c50fea9f9fb44b87d966cb7f Mon Sep 17 00:00:00 2001 From: Masin Al-Dujaili Date: Mon, 13 Apr 2020 12:18:36 +0200 Subject: [PATCH] Further adaptions --- .../__init__.py | 33 ++- certbot_dns_corenetworks/dns_corenetworks.py | 84 ++++++ .../dns_corenetworks_test.py | 51 ++++ certbot_dns_ispconfig/dns_ispconfig.py | 269 ------------------ certbot_dns_ispconfig/dns_ispconfig_test.py | 168 ----------- 5 files changed, 151 insertions(+), 454 deletions(-) rename {certbot_dns_ispconfig => certbot_dns_corenetworks}/__init__.py (69%) create mode 100644 certbot_dns_corenetworks/dns_corenetworks.py create mode 100644 certbot_dns_corenetworks/dns_corenetworks_test.py delete mode 100644 certbot_dns_ispconfig/dns_ispconfig.py delete mode 100644 certbot_dns_ispconfig/dns_ispconfig_test.py diff --git a/certbot_dns_ispconfig/__init__.py b/certbot_dns_corenetworks/__init__.py similarity index 69% rename from certbot_dns_ispconfig/__init__.py rename to certbot_dns_corenetworks/__init__.py index 242e914..30a5023 100644 --- a/certbot_dns_ispconfig/__init__.py +++ b/certbot_dns_corenetworks/__init__.py @@ -1,16 +1,16 @@ """ -The `~certbot_dns_ispconfig.dns_ispconfig` plugin automates the process of +The `~certbot_dns_corenetworks.dns_corenetworks` plugin automates the process of completing a ``dns-01`` challenge (`~acme.challenges.DNS01`) by creating, and -subsequently removing, TXT records using the ISPConfig REST API. +subsequently removing, TXT records using the Core Networks beta API. Named Arguments --------------- ======================================== ===================================== -``--dns-ispconfig-credentials`` ISPConfig Remote API credentials_ +``--dns-corenetworks-credentials`` Core Networks beta API credentials_ INI file. (Required) -``--dns-ispconfig-propagation-seconds`` The number of seconds to wait for DNS +``--dns-corenetworks-propagation-seconds`` The number of seconds to wait for DNS to propagate before asking the ACME server to verify the DNS record. (Default: 120) @@ -20,7 +20,7 @@ Named Arguments Credentials ----------- -Use of this plugin requires a configuration file containing ISPConfig Remote API +Use of this plugin requires a configuration file containing Core Networks beta API credentials, obtained from your DNSimple `System > Remote Users`. @@ -28,13 +28,12 @@ credentials, obtained from your DNSimple :name: credentials.ini :caption: Example credentials file: - # ISPCONFIG API credentials used by Certbot - dns_ispconfig_username = myispremoteuser - dns_ispconfig_password = mysecretpassword - dns_ispconfig_endpoint = https://localhost:8080 + # Core Networks API credentials used by Certbot + dns_corenetworks_username = mycorenetworksapiuser + dns_corenetworks_password = mysecretpassword The path to this file can be provided interactively or using the -``--dns-ispconfig-credentials`` command-line argument. Certbot records the path +``--dns-corenetworks-credentials`` command-line argument. Certbot records the path to this file for use during renewal, but does not store the file's contents. .. caution:: @@ -59,8 +58,8 @@ Examples :caption: To acquire a certificate for ``example.com`` certbot certonly \\ - --dns-ispconfig \\ - --dns-ispconfig-credentials ~/.secrets/certbot/ispconfig.ini \\ + --dns-corenetworks \\ + --dns-corenetworks-credentials ~/.secrets/certbot/corenetworks.ini \\ -d example.com .. code-block:: bash @@ -68,8 +67,8 @@ Examples ``www.example.com`` certbot certonly \\ - --dns-ispconfig \\ - --dns-ispconfig-credentials ~/.secrets/certbot/ispconfig.ini \\ + --dns-corenetworks \\ + --dns-corenetworks-credentials ~/.secrets/certbot/corenetworks.ini \\ -d example.com \\ -d www.example.com @@ -78,9 +77,9 @@ Examples for DNS propagation certbot certonly \\ - --dns-ispconfig \\ - --dns-ispconfig-credentials ~/.secrets/certbot/ispconfig.ini \\ - --dns-ispconfig-propagation-seconds 240 \\ + --dns-corenetworks \\ + --dns-corenetworks-credentials ~/.secrets/certbot/corenetworks.ini \\ + --dns-corenetworks-propagation-seconds 240 \\ -d example.com """ diff --git a/certbot_dns_corenetworks/dns_corenetworks.py b/certbot_dns_corenetworks/dns_corenetworks.py new file mode 100644 index 0000000..8728e07 --- /dev/null +++ b/certbot_dns_corenetworks/dns_corenetworks.py @@ -0,0 +1,84 @@ +"""DNS Authenticator for Core Networks DNS.""" +import logging + +import zope.interface +from lexicon.providers import corenetworks + +from certbot import errors +from certbot import interfaces +from certbot.plugins import dns_common +from certbot.plugins import dns_common_lexicon + +logger = logging.getLogger(__name__) + +ACCOUNT_URL = 'https://beta.api.core-networks.de/doc/#functon_auth_token' + + +@zope.interface.implementer(interfaces.IAuthenticator) +@zope.interface.provider(interfaces.IPluginFactory) +class Authenticator(dns_common.DNSAuthenticator): + """DNS Authenticator for Core Networks + + This Authenticator uses the Core Networks beta API to fulfill a dns-01 challenge. + """ + + description = 'Obtain certificates using a DNS TXT record (if you are using Core Networks for DNS).' + ttl = 60 + + def __init__(self, *args, **kwargs): + super(Authenticator, self).__init__(*args, **kwargs) + self.credentials = None + + @classmethod + def add_parser_arguments(cls, add): # pylint: disable=arguments-differ + super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=30) + add('credentials', help='Core Networks credentials INI file.') + + def more_info(self): # pylint: disable=missing-docstring,no-self-use + return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \ + 'the Core Networks beta API.' + + def _setup_credentials(self): + self.credentials = self._configure_credentials( + 'credentials', + 'Core Networks credentials INI file', + { + 'login': 'API user name for Core Networks beta API. (See {0}.)'.format(ACCOUNT_URL), + 'password': 'Password for API user' + } + ) + + def _perform(self, domain, validation_name, validation): + self._get_corenetworks_client().add_txt_record(domain, validation_name, validation) + + def _cleanup(self, domain, validation_name, validation): + self._get_corenetworks_client().del_txt_record(domain, validation_name, validation) + + def _get_corenetworks_client(self): + return _CoreNetworksLexiconClient(self.credentials.conf('login'), self.credentials.conf('password'), self.ttl) + + +class _CoreNetworksLexiconClient(dns_common_lexicon.LexiconClient): + """ + Encapsulates all communication with the Core Networks API via Lexicon. + """ + + def __init__(self, login, password, ttl): + super(_CoreNetworksLexiconClient, self).__init__() + + config = dns_common_lexicon.build_lexicon_config('corenetworks', { + 'ttl': ttl, + }, { + 'auth_usernam': login, + 'auth_password': password + }) + + self.provider = corenetworks.Provider(config) + + def _handle_http_error(self, e, domain_name): + hint = None + if str(e).startswith('401 Client Error: Unauthorized for url:'): + hint = 'Is your API token value correct?' + + return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}' + .format(domain_name, e, ' ({0})'.format(hint) if hint else '')) diff --git a/certbot_dns_corenetworks/dns_corenetworks_test.py b/certbot_dns_corenetworks/dns_corenetworks_test.py new file mode 100644 index 0000000..086cc04 --- /dev/null +++ b/certbot_dns_corenetworks/dns_corenetworks_test.py @@ -0,0 +1,51 @@ +"""Tests for certbot_dns_corenetworks.dns_corenetworks.""" + +import os +import unittest + +import mock +from requests.exceptions import HTTPError + +from certbot.plugins import dns_test_common +from certbot.plugins import dns_test_common_lexicon +from certbot.tests import util as test_util + +LOGIN = 'foo' +PASSWORD = 'bar' + +class AuthenticatorTest(test_util.TempDirTestCase, + dns_test_common_lexicon.BaseLexiconAuthenticatorTest): + + def setUp(self): + super(AuthenticatorTest, self).setUp() + + from certbot_dns_corenetworks.dns_corenetworks import Authenticator + + path = os.path.join(self.tempdir, 'file.ini') + dns_test_common.write({"corenetworks_login": LOGIN, "corenetworks_password": PASSWORD }, path) + + self.config = mock.MagicMock(corenetworks_credentials=path, + corenetworks_propagation_seconds=0) # don't wait during tests + + self.auth = Authenticator(self.config, "corenetworks") + + self.mock_client = mock.MagicMock() + # _get_corenetworks_client | pylint: disable=protected-access + self.auth._get_corenetworks_client = mock.MagicMock(return_value=self.mock_client) + + +class CoreNetworksLexiconClientTest(unittest.TestCase, dns_test_common_lexicon.BaseLexiconClientTest): + + LOGIN_ERROR = HTTPError('401 Client Error: Unauthorized for url: ...') + + def setUp(self): + from certbot_dns_corenetworks.dns_corenetworks import _CoreNetworksLexiconClient + + self.client = _CoreNetworksLexiconClient(LOGIN, PASSWORD, 0) + + self.provider_mock = mock.MagicMock() + self.client.provider = self.provider_mock + + +if __name__ == "__main__": + unittest.main() # pragma: no cover diff --git a/certbot_dns_ispconfig/dns_ispconfig.py b/certbot_dns_ispconfig/dns_ispconfig.py deleted file mode 100644 index 61d11a1..0000000 --- a/certbot_dns_ispconfig/dns_ispconfig.py +++ /dev/null @@ -1,269 +0,0 @@ -"""DNS Authenticator for ISPConfig.""" -import json -import logging -import time - -import requests -import zope.interface - -from certbot import errors -from certbot import interfaces -from certbot.plugins import dns_common - -logger = logging.getLogger(__name__) - - -@zope.interface.implementer(interfaces.IAuthenticator) -@zope.interface.provider(interfaces.IPluginFactory) -class Authenticator(dns_common.DNSAuthenticator): - """DNS Authenticator for ISPConfig - - This Authenticator uses the ISPConfig Remote REST API to fulfill a dns-01 challenge. - """ - - description = "Obtain certificates using a DNS TXT record (if you are using ISPConfig for DNS)." - ttl = 60 - - def __init__(self, *args, **kwargs): - super(Authenticator, self).__init__(*args, **kwargs) - self.credentials = None - - @classmethod - def add_parser_arguments(cls, add): # pylint: disable=arguments-differ - super(Authenticator, cls).add_parser_arguments( - add, default_propagation_seconds=120 - ) - add("credentials", help="ISPConfig credentials INI file.") - - def more_info(self): # pylint: disable=missing-docstring,no-self-use - return ( - "This plugin configures a DNS TXT record to respond to a dns-01 challenge using " - + "the ISPConfig Remote REST API." - ) - - def _setup_credentials(self): - self.credentials = self._configure_credentials( - "credentials", - "ISPConfig credentials INI file", - { - "endpoint": "URL of the ISPConfig Remote API.", - "username": "Username for ISPConfig Remote API.", - "password": "Password for ISPConfig Remote API.", - }, - ) - - def _perform(self, domain, validation_name, validation): - self._get_ispconfig_client().add_txt_record( - domain, validation_name, validation, self.ttl - ) - - def _cleanup(self, domain, validation_name, validation): - self._get_ispconfig_client().del_txt_record( - domain, validation_name, validation, self.ttl - ) - - def _get_ispconfig_client(self): - return _ISPConfigClient( - self.credentials.conf("endpoint"), - self.credentials.conf("username"), - self.credentials.conf("password"), - ) - - -class _ISPConfigClient(object): - """ - Encapsulates all communication with the ISPConfig Remote REST API. - """ - - def __init__(self, endpoint, username, password): - logger.debug("creating ispconfigclient") - self.endpoint = endpoint - self.username = username - self.password = password - self.session = requests.Session() - self.session_id = None - - def _login(self): - if self.session_id is not None: - return - logger.debug("logging in") - logindata = {"username": self.username, "password": self.password} - self.session_id = self._api_request("login", logindata) - logger.debug("session id is %s", self.session_id) - - def _api_request(self, action, data): - if self.session_id is not None: - data["session_id"] = self.session_id - url = self._get_url(action) - resp = self.session.get(url, json=data) - logger.debug("API REquest to URL: %s", url) - if resp.status_code != 200: - raise errors.PluginError( - "HTTP Error during login {0}".format(resp.status_code) - ) - try: - result = resp.json() - except: - raise errors.PluginError( - "API response with non JSON: {0}".format(resp.text) - ) - if result["code"] == "ok": - return result["response"] - elif result["code"] == "remote_fault": - raise errors.PluginError( - "API response with an error: {0}".format(result["message"]) - ) - else: - raise errors.PluginError("API response unknown {0}".format(resp.text)) - - def _get_url(self, action): - return "{0}?{1}".format(self.endpoint, action) - - def _get_server_id(self, zone_id): - zone = self._api_request("dns_zone_get", {"primary_id": zone_id}) - return zone["server_id"] - - def add_txt_record(self, domain, record_name, record_content, record_ttl): - """ - Add a TXT record using the supplied information. - - :param str domain: The domain to use to look up the managed zone. - :param str record_name: The record name (typically beginning with '_acme-challenge.'). - :param str record_content: The record content (typically the challenge validation). - :param int record_ttl: The record TTL (number of seconds that the record may be cached). - :raises certbot.errors.PluginError: if an error occurs communicating with the ISPConfig API - """ - self._login() - zone_id, zone_name = self._find_managed_zone_id(domain, record_name) - if zone_id is None: - raise errors.PluginError("Domain not known") - logger.debug("domain found: %s with id: %s", zone_name, zone_id) - o_record_name = record_name - record_name = record_name.replace(zone_name, "")[:-1] - logger.debug( - "using record_name: %s from original: %s", record_name, o_record_name - ) - record = self.get_existing_txt(zone_id, record_name, record_content) - if record is not None: - if record["data"] == record_content: - logger.info("already there, id {0}".format(record["id"])) - return - else: - logger.info("update {0}".format(record["id"])) - self._update_txt_record( - zone_id, record["id"], record_name, record_content, record_ttl - ) - else: - logger.info("insert new txt record") - self._insert_txt_record(zone_id, record_name, record_content, record_ttl) - - def del_txt_record(self, domain, record_name, record_content, record_ttl): - """ - Delete a TXT record using the supplied information. - - :param str domain: The domain to use to look up the managed zone. - :param str record_name: The record name (typically beginning with '_acme-challenge.'). - :param str record_content: The record content (typically the challenge validation). - :param int record_ttl: The record TTL (number of seconds that the record may be cached). - :raises certbot.errors.PluginError: if an error occurs communicating with the ISPConfig API - """ - self._login() - zone_id, zone_name = self._find_managed_zone_id(domain, record_name) - if zone_id is None: - raise errors.PluginError("Domain not known") - logger.debug("domain found: %s with id: %s", zone_name, zone_id) - o_record_name = record_name - record_name = record_name.replace(zone_name, "")[:-1] - logger.debug( - "using record_name: %s from original: %s", record_name, o_record_name - ) - record = self.get_existing_txt(zone_id, record_name, record_content) - if record is not None: - if record["data"] == record_content: - logger.debug("delete TXT record: %s", record["id"]) - self._delete_txt_record(record["id"]) - - def _prepare_rr_data(self, zone_id, record_name, record_content, record_ttl): - server_id = self._get_server_id(zone_id) - data = { - "client_id": None, - "rr_type": "TXT", - "params": { - "server_id": server_id, - "name": record_name, - "active": "Y", - "type": "TXT", - "data": record_content, - "zone": zone_id, - "ttl": record_ttl, - "update_serial": False, - "stamp": time.strftime('%Y-%m-%d %H:%M:%S'), - }, - } - return data - - def _insert_txt_record(self, zone_id, record_name, record_content, record_ttl): - data = self._prepare_rr_data(zone_id, record_name, record_content, record_ttl) - logger.debug("insert with data: %s", data) - result = self._api_request("dns_txt_add", data) - - def _update_txt_record( - self, zone_id, primary_id, record_name, record_content, record_ttl - ): - data = self._prepare_rr_data(zone_id, record_name, record_content, record_ttl) - data["primary_id"] = primary_id - logger.debug("update with data: %s", data) - result = self._api_request("dns_txt_update", data) - - def _delete_txt_record(self, primary_id): - data = {"primary_id": primary_id} - logger.debug("delete with data: %s", data) - result = self._api_request("dns_txt_delete", data) - - def _find_managed_zone_id(self, domain, record_name): - """ - Find the managed zone for a given domain. - - :param str domain: The domain for which to find the managed zone. - :returns: The ID of the managed zone, if found. - :rtype: str - :raises certbot.errors.PluginError: if the managed zone cannot be found. - """ - - zone_dns_name_guesses = [record_name] + dns_common.base_domain_name_guesses(domain) - - for zone_name in zone_dns_name_guesses: - # get the zone id - try: - logger.debug("looking for zone: %s", zone_name) - zone_id = self._api_request("dns_zone_get_id", {"origin": zone_name}) - return zone_id, zone_name - except errors.PluginError as e: - pass - return None - - def get_existing_txt(self, zone_id, record_name, record_content): - """ - Get existing TXT records from the RRset for the record name. - - If an error occurs while requesting the record set, it is suppressed - and None is returned. - - :param str zone_id: The ID of the managed zone. - :param str record_name: The record name (typically beginning with '_acme-challenge.'). - - :returns: TXT record value or None - :rtype: `string` or `None` - - """ - self._login() - read_zone_data = {"zone_id": zone_id} - zone_data = self._api_request("dns_rr_get_all_by_zone", read_zone_data) - for entry in zone_data: - if ( - entry["name"] == record_name - and entry["type"] == "TXT" - and entry["data"] == record_content - ): - return entry - return None diff --git a/certbot_dns_ispconfig/dns_ispconfig_test.py b/certbot_dns_ispconfig/dns_ispconfig_test.py deleted file mode 100644 index 7c5839a..0000000 --- a/certbot_dns_ispconfig/dns_ispconfig_test.py +++ /dev/null @@ -1,168 +0,0 @@ -"""Tests for certbot_dns_ispconfig.dns_ispconfig.""" - -import unittest - -import mock -import json -import requests_mock - -from certbot import errors -from certbot.compat import os -from certbot.errors import PluginError -from certbot.plugins import dns_test_common -from certbot.plugins.dns_test_common import DOMAIN -from certbot.tests import util as test_util - -FAKE_USER = "remoteuser" -FAKE_PW = "password" -FAKE_ENDPOINT = "mock://endpoint" - - -class AuthenticatorTest( - test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest -): - def setUp(self): - super(AuthenticatorTest, self).setUp() - - from certbot_dns_ispconfig.dns_ispconfig import Authenticator - - path = os.path.join(self.tempdir, "file.ini") - dns_test_common.write( - { - "ispconfig_username": FAKE_USER, - "ispconfig_password": FAKE_PW, - "ispconfig_endpoint": FAKE_ENDPOINT, - }, - path, - ) - - super(AuthenticatorTest, self).setUp() - self.config = mock.MagicMock( - ispconfig_credentials=path, ispconfig_propagation_seconds=0 - ) # don't wait during tests - - self.auth = Authenticator(self.config, "ispconfig") - - self.mock_client = mock.MagicMock() - # _get_ispconfig_client | pylint: disable=protected-access - self.auth._get_ispconfig_client = mock.MagicMock(return_value=self.mock_client) - - def test_perform(self): - self.auth.perform([self.achall]) - - expected = [ - mock.call.add_txt_record( - DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY, mock.ANY - ) - ] - self.assertEqual(expected, self.mock_client.mock_calls) - - def test_cleanup(self): - # _attempt_cleanup | pylint: disable=protected-access - self.auth._attempt_cleanup = True - self.auth.cleanup([self.achall]) - - expected = [ - mock.call.del_txt_record( - DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY, mock.ANY - ) - ] - self.assertEqual(expected, self.mock_client.mock_calls) - - -class ISPConfigClientTest(unittest.TestCase): - record_name = "foo" - record_content = "bar" - record_ttl = 42 - - def setUp(self): - from certbot_dns_ispconfig.dns_ispconfig import _ISPConfigClient - - self.adapter = requests_mock.Adapter() - - self.client = _ISPConfigClient(FAKE_ENDPOINT, FAKE_USER, FAKE_PW) - self.client.session.mount("mock", self.adapter) - - def _register_response( - self, ep_id, response=None, message=None, additional_matcher=None, **kwargs - ): - resp = {"code": "ok", "message": message, "response": response} - if message is not None: - resp["code"] = "remote_failure" - - def add_matcher(request): - data = json.loads(request.text) - add_result = True - if additional_matcher is not None: - add_result = additionsal_matcher(request) - - return ( - ( - ("username" in data and data["username"] == FAKE_USER) - and ("username" in data and data["password"] == FAKE_PW) - ) - or data["session_id"] == "FAKE_SESSION" - ) and add_result - - self.adapter.register_uri( - requests_mock.ANY, - "{0}?{1}".format(FAKE_ENDPOINT, ep_id), - text=json.dumps(resp), - additional_matcher=add_matcher, - **kwargs - ) - - def test_add_txt_record(self): - self._register_response("login", response="FAKE_SESSION") - self._register_response("dns_zone_get_id", response=23) - self._register_response("dns_txt_add", response=99) - self._register_response( - "dns_zone_get", response={"zone_id": 102, "server_id": 1} - ) - self._register_response("dns_rr_get_all_by_zone", response=[]) - self.client.add_txt_record( - DOMAIN, self.record_name, self.record_content, self.record_ttl - ) - - def test_add_txt_record_fail_to_find_domain(self): - self._register_response("login", response="FAKE_SESSION") - self._register_response("dns_zone_get_id", message="Not Found") - with self.assertRaises(errors.PluginError) as context: - self.client.add_txt_record( - DOMAIN, self.record_name, self.record_content, self.record_ttl - ) - - def test_add_txt_record_fail_to_authenticate(self): - self._register_response("login", message="FAILED") - with self.assertRaises(errors.PluginError) as context: - self.client.add_txt_record( - DOMAIN, self.record_name, self.record_content, self.record_ttl - ) - - def test_del_txt_record(self): - self._register_response("login", response="FAKE_SESSION") - self._register_response("dns_zone_get_id", response=23) - self._register_response("dns_rr_get_all_by_zone", response=[]) - self._register_response("dns_txt_delete", response="") - self.client.del_txt_record( - DOMAIN, self.record_name, self.record_content, self.record_ttl - ) - - def test_del_txt_record_fail_to_find_domain(self): - self._register_response("login", response="FAKE_SESSION") - self._register_response("dns_zone_get_id", message="Not Found") - with self.assertRaises(errors.PluginError) as context: - self.client.del_txt_record( - DOMAIN, self.record_name, self.record_content, self.record_ttl - ) - - def test_del_txt_record_fail_to_authenticate(self): - self._register_response("login", message="FAILED") - with self.assertRaises(errors.PluginError) as context: - self.client.del_txt_record( - DOMAIN, self.record_name, self.record_content, self.record_ttl - ) - - -if __name__ == "__main__": - unittest.main() # pragma: no cover