Update for modern Python compatibility

This commit is contained in:
2025-11-20 02:47:11 +01:00
parent 06924213d2
commit edc55a5b0d
13 changed files with 1422 additions and 348 deletions

View File

@@ -1,97 +1,242 @@
"""DNS Authenticator for Core Networks DNS."""
import logging
import zope.interface
from lexicon.providers import corenetworks
import logging
import requests
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
logger = logging.getLogger(__name__)
ACCOUNT_URL = 'https://beta.api.core-networks.de/doc/#functon_auth_token'
ACCOUNT_URL = "https://beta.api.core-networks.de/doc/#functon_auth_token"
API_BASE_URL = "https://beta.api.core-networks.de"
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for Core Networks
This Authenticator uses the Core Networks beta API to fulfill a dns-01 challenge.
"""
description = 'Obtain certificates using a DNS TXT record (if you are using Core Networks for DNS).'
description = "Obtain certificates using a DNS TXT record (if you are using Core Networks for DNS)."
ttl = 60
def __init__(self, *args, **kwargs):
# l = open('corenetworks.log', 'a')
# for arg in args:
# l.write("Arg: %s\n" % arg)
# for kwarg in kwargs:
# l.write("KWarg: %s\n" % kwarg)
# l.write(str(config))
# l.write("Authenticator instantiated with args %s and kwargs %s" % (*args, **kwargs))
# l.close()
super(Authenticator, self).__init__(*args, **kwargs)
self.credentials = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=30)
add('credentials', help='Core Networks credentials INI file.')
def add_parser_arguments(cls, add):
super(Authenticator, cls).add_parser_arguments(
add, default_propagation_seconds=30
)
add("credentials", help="Core Networks credentials INI file.")
def more_info(self): # pylint: disable=missing-docstring,no-self-use
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Core Networks beta API.'
def more_info(self):
return (
"This plugin configures a DNS TXT record to respond to a dns-01 challenge using "
+ "the Core Networks beta API."
)
def _setup_credentials(self):
self.credentials = self._configure_credentials(
'credentials',
'Core Networks credentials INI file',
"credentials",
"Core Networks credentials INI file",
{
'login': 'API user name for Core Networks beta API. (See {0}.)'.format(ACCOUNT_URL),
'password': 'Password for API user'
}
"login": "API user name for Core Networks beta API. (See {0}.)".format(
ACCOUNT_URL
),
"password": "Password for API user",
},
)
# l = open('corenetworks.log', 'a')
# l.write("_setup_credentials configured these credentials: %s\n" % str(self.credentials) )
# l.close()
def _perform(self, domain, validation_name, validation):
self._get_corenetworks_client().add_txt_record(domain, validation_name, validation)
self._get_corenetworks_client().add_txt_record(
domain, validation_name, validation
)
def _cleanup(self, domain, validation_name, validation):
self._get_corenetworks_client().del_txt_record(domain, validation_name, validation)
self._get_corenetworks_client().del_txt_record(
domain, validation_name, validation
)
def _get_corenetworks_client(self):
return _CoreNetworksLexiconClient(self.credentials.conf('login'), self.credentials.conf('password'), self.ttl)
return _CoreNetworksAPIClient(
self.credentials.conf("login"), self.credentials.conf("password"), self.ttl
)
class _CoreNetworksLexiconClient(dns_common_lexicon.LexiconClient):
class _CoreNetworksAPIClient:
"""
Encapsulates all communication with the Core Networks API via Lexicon.
Encapsulates all communication with the Core Networks API.
"""
def __init__(self, login, password, ttl):
super(_CoreNetworksLexiconClient, self).__init__()
# l = open('corenetworks.log', 'a')
# l.write("_CoreNetworksLexiconClient instantiated with login {0} and password {1}".format(login, password))
# l.close()
config = dns_common_lexicon.build_lexicon_config('corenetworks', {
'ttl': ttl,
}, {
'auth_username': login,
'auth_password': password
})
self.login = login
self.password = password
self.ttl = ttl
self.token = None
self.provider = corenetworks.Provider(config)
def _authenticate(self):
"""Authenticate with Core Networks API and get token."""
if self.token:
return self.token
def _handle_http_error(self, e, domain_name):
hint = None
if str(e).startswith('401 Client Error: Unauthorized for url:'):
hint = 'Is your API token value correct?'
try:
response = requests.post(
f"{API_BASE_URL}/auth/token",
json={"login": self.login, "password": self.password},
timeout=30,
)
response.raise_for_status()
data = response.json()
self.token = data["token"]
logger.debug("Successfully authenticated with Core Networks API")
return self.token
except requests.exceptions.RequestException as e:
raise errors.PluginError(
f"Failed to authenticate with Core Networks API: {e}"
)
return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
.format(domain_name, e, ' ({0})'.format(hint) if hint else ''))
def _get_headers(self):
"""Get headers with authentication token."""
token = self._authenticate()
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def _find_zone(self, domain):
"""
Find the DNS zone for the given domain.
:param str domain: The domain to find the zone for
:returns: The zone name
:rtype: str
"""
try:
response = requests.get(
f"{API_BASE_URL}/dnszones/", headers=self._get_headers(), timeout=30
)
response.raise_for_status()
zones = response.json()
# Find the longest matching zone
domain_parts = domain.rstrip(".").split(".")
for i in range(len(domain_parts)):
potential_zone = ".".join(domain_parts[i:])
for zone in zones:
if zone["name"] == potential_zone:
logger.debug(f"Found zone {potential_zone} for domain {domain}")
return potential_zone
raise errors.PluginError(
f"Could not find DNS zone for domain {domain}. "
f"Available zones: {', '.join([z['name'] for z in zones])}"
)
except requests.exceptions.RequestException as e:
raise errors.PluginError(f"Failed to list DNS zones: {e}")
def _get_relative_name(self, record_name, zone):
"""
Get the relative record name for the zone.
:param str record_name: The full record name (FQDN)
:param str zone: The zone name
:returns: The relative record name
:rtype: str
"""
# Remove trailing dots
record_name = record_name.rstrip(".")
zone = zone.rstrip(".")
# If record_name equals zone, return "@" for apex
if record_name == zone:
return "@"
# Remove zone from record_name to get relative name
if record_name.endswith("." + zone):
return record_name[: -(len(zone) + 1)]
# If it doesn't end with the zone, it might already be relative
return record_name
def add_txt_record(self, domain, record_name, record_content):
"""
Add a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:raises errors.PluginError: if an error occurs communicating with the Core Networks API
"""
zone = self._find_zone(domain)
relative_name = self._get_relative_name(record_name, zone)
try:
# Add the TXT record
response = requests.post(
f"{API_BASE_URL}/dnszones/{zone}/records/",
headers=self._get_headers(),
json={
"name": relative_name,
"type": "TXT",
"data": record_content,
"ttl": self.ttl,
},
timeout=30,
)
response.raise_for_status()
logger.info(f"Successfully added TXT record {relative_name} to zone {zone}")
# Commit the changes
commit_response = requests.post(
f"{API_BASE_URL}/dnszones/{zone}/records/commit",
headers=self._get_headers(),
timeout=30,
)
commit_response.raise_for_status()
logger.info(f"Successfully committed changes to zone {zone}")
except requests.exceptions.RequestException as e:
raise errors.PluginError(f"Error adding TXT record to Core Networks: {e}")
def del_txt_record(self, domain, record_name, record_content):
"""
Delete a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:raises errors.PluginError: if an error occurs communicating with the Core Networks API
"""
zone = self._find_zone(domain)
relative_name = self._get_relative_name(record_name, zone)
try:
# Delete the TXT record
response = requests.post(
f"{API_BASE_URL}/dnszones/{zone}/records/delete",
headers=self._get_headers(),
json={
"name": relative_name,
"type": "TXT",
"data": record_content,
},
timeout=30,
)
response.raise_for_status()
logger.info(
f"Successfully deleted TXT record {relative_name} from zone {zone}"
)
# Commit the changes
commit_response = requests.post(
f"{API_BASE_URL}/dnszones/{zone}/records/commit",
headers=self._get_headers(),
timeout=30,
)
commit_response.raise_for_status()
logger.info(f"Successfully committed changes to zone {zone}")
except requests.exceptions.RequestException as e:
raise errors.PluginError(
f"Error deleting TXT record from Core Networks: {e}"
)

View File

@@ -2,50 +2,214 @@
import os
import unittest
from unittest import mock
import mock
from requests.exceptions import HTTPError
from certbot import errors
from certbot.plugins import dns_test_common
from certbot.plugins import dns_test_common_lexicon
from certbot.tests import util as test_util
LOGIN = 'foo'
PASSWORD = 'bar'
LOGIN = "foo"
PASSWORD = "bar"
class AuthenticatorTest(test_util.TempDirTestCase,
dns_test_common_lexicon.BaseLexiconAuthenticatorTest):
class AuthenticatorTest(test_util.TempDirTestCase):
def setUp(self):
super(AuthenticatorTest, self).setUp()
from certbot_dns_corenetworks.dns_corenetworks import Authenticator
path = os.path.join(self.tempdir, 'file.ini')
dns_test_common.write({"corenetworks_login": LOGIN, "corenetworks_password": PASSWORD }, path)
path = os.path.join(self.tempdir, "file.ini")
dns_test_common.write(
{"corenetworks_login": LOGIN, "corenetworks_password": PASSWORD}, path
)
self.config = mock.MagicMock(corenetworks_credentials=path,
corenetworks_propagation_seconds=0) # don't wait during tests
self.config = mock.MagicMock(
corenetworks_credentials=path,
corenetworks_propagation_seconds=0, # don't wait during tests
)
self.auth = Authenticator(self.config, "corenetworks")
self.mock_client = mock.MagicMock()
# _get_corenetworks_client | pylint: disable=protected-access
self.auth._get_corenetworks_client = mock.MagicMock(return_value=self.mock_client)
self.auth._get_corenetworks_client = mock.MagicMock(
return_value=self.mock_client
)
@mock.patch("certbot.display.util.notify")
def test_perform(self, mock_notify):
self.auth.perform([self.achall])
expected = [
mock.call.add_txt_record(
"example.com", "_acme-challenge.example.com", mock.ANY
)
]
self.assertEqual(expected, self.mock_client.mock_calls)
def test_cleanup(self):
self.auth._attempt_cleanup = True # pylint: disable=protected-access
self.auth.cleanup([self.achall])
expected = [
mock.call.del_txt_record(
"example.com", "_acme-challenge.example.com", mock.ANY
)
]
self.assertEqual(expected, self.mock_client.mock_calls)
@property
def achall(self):
from acme import challenges
domain = "example.com"
key = challenges.KeyAuthorizationChallengeResponse(
key_authorization="foo"
).key_authorization
return mock.MagicMock(
domain=domain,
validation_domain_name=lambda x: "_acme-challenge." + domain,
validation=lambda x: key,
)
class CoreNetworksLexiconClientTest(unittest.TestCase, dns_test_common_lexicon.BaseLexiconClientTest):
LOGIN_ERROR = HTTPError('401 Client Error: Unauthorized for url: ...')
class CoreNetworksAPIClientTest(unittest.TestCase):
def setUp(self):
from certbot_dns_corenetworks.dns_corenetworks import _CoreNetworksLexiconClient
from certbot_dns_corenetworks.dns_corenetworks import _CoreNetworksAPIClient
self.client = _CoreNetworksLexiconClient(LOGIN, PASSWORD, 0)
self.client = _CoreNetworksAPIClient(LOGIN, PASSWORD, 60)
self.provider_mock = mock.MagicMock()
self.client.provider = self.provider_mock
@mock.patch("certbot_dns_corenetworks.dns_corenetworks.requests.post")
def test_authenticate(self, mock_post):
mock_response = mock.MagicMock()
mock_response.json.return_value = {"token": "test-token", "expires": 3600}
mock_post.return_value = mock_response
token = self.client._authenticate()
self.assertEqual(token, "test-token")
self.assertEqual(self.client.token, "test-token")
mock_post.assert_called_once()
@mock.patch("certbot_dns_corenetworks.dns_corenetworks.requests.post")
def test_authenticate_error(self, mock_post):
import requests
mock_post.side_effect = requests.exceptions.RequestException("API Error")
with self.assertRaises(errors.PluginError) as context:
self.client._authenticate()
self.assertIn("Failed to authenticate", str(context.exception))
@mock.patch("certbot_dns_corenetworks.dns_corenetworks.requests.get")
@mock.patch("certbot_dns_corenetworks.dns_corenetworks.requests.post")
def test_find_zone(self, mock_post, mock_get):
# Mock authentication
mock_auth_response = mock.MagicMock()
mock_auth_response.json.return_value = {"token": "test-token", "expires": 3600}
mock_post.return_value = mock_auth_response
# Mock zone list
mock_zone_response = mock.MagicMock()
mock_zone_response.json.return_value = [
{"name": "example.com", "type": "master"},
{"name": "test.com", "type": "master"},
]
mock_get.return_value = mock_zone_response
zone = self.client._find_zone("sub.example.com")
self.assertEqual(zone, "example.com")
@mock.patch("certbot_dns_corenetworks.dns_corenetworks.requests.post")
@mock.patch("certbot_dns_corenetworks.dns_corenetworks.requests.get")
def test_add_txt_record(self, mock_get, mock_post):
# Mock authentication
mock_auth_response = mock.MagicMock()
mock_auth_response.json.return_value = {"token": "test-token", "expires": 3600}
# Mock zone list
mock_zone_response = mock.MagicMock()
mock_zone_response.json.return_value = [
{"name": "example.com", "type": "master"}
]
# Mock add record response
mock_add_response = mock.MagicMock()
# Mock commit response
mock_commit_response = mock.MagicMock()
mock_get.return_value = mock_zone_response
mock_post.side_effect = [
mock_auth_response,
mock_add_response,
mock_commit_response,
]
self.client.add_txt_record(
"example.com", "_acme-challenge.example.com", "test-validation"
)
# Verify authentication, add record, and commit were called
self.assertEqual(mock_post.call_count, 3)
@mock.patch("certbot_dns_corenetworks.dns_corenetworks.requests.post")
@mock.patch("certbot_dns_corenetworks.dns_corenetworks.requests.get")
def test_del_txt_record(self, mock_get, mock_post):
# Mock authentication
mock_auth_response = mock.MagicMock()
mock_auth_response.json.return_value = {"token": "test-token", "expires": 3600}
# Mock zone list
mock_zone_response = mock.MagicMock()
mock_zone_response.json.return_value = [
{"name": "example.com", "type": "master"}
]
# Mock delete record response
mock_del_response = mock.MagicMock()
# Mock commit response
mock_commit_response = mock.MagicMock()
mock_get.return_value = mock_zone_response
mock_post.side_effect = [
mock_auth_response,
mock_del_response,
mock_commit_response,
]
self.client.del_txt_record(
"example.com", "_acme-challenge.example.com", "test-validation"
)
# Verify authentication, delete record, and commit were called
self.assertEqual(mock_post.call_count, 3)
def test_get_relative_name(self):
# Test apex domain
self.assertEqual(
self.client._get_relative_name("example.com", "example.com"), "@"
)
# Test subdomain
self.assertEqual(
self.client._get_relative_name("www.example.com", "example.com"), "www"
)
# Test with trailing dots
self.assertEqual(
self.client._get_relative_name("www.example.com.", "example.com."), "www"
)
# Test ACME challenge
self.assertEqual(
self.client._get_relative_name(
"_acme-challenge.example.com", "example.com"
),
"_acme-challenge",
)
if __name__ == "__main__":
unittest.main() # pragma: no cover
unittest.main()