Skip to content

Commit

Permalink
fix some linting things
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsasha committed Dec 20, 2023
1 parent ed54a6b commit 6a8265d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 24 deletions.
1 change: 1 addition & 0 deletions checks/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

DEFAULT_TIMEOUT = 10


def _do_request(args, headers, kwargs, session, url):
"""
This small wrapper helps with handling of redirects.
Expand Down
74 changes: 50 additions & 24 deletions checks/tasks/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from cryptography.x509 import (
NameOID,
SignatureAlgorithmOID,
load_pem_x509_certificate, Certificate,
Certificate,
)
from django.conf import settings
from django.db import transaction
Expand All @@ -32,10 +32,14 @@
ServerScanStatusEnum,
ScanCommand,
TlsVersionEnum,
CipherSuiteAcceptedByServer, ServerNetworkConfiguration, ProtocolWithOpportunisticTlsEnum,
CipherSuiteAcceptedByServer,
ServerNetworkConfiguration,
ProtocolWithOpportunisticTlsEnum,
)
from sslyze.plugins.certificate_info._certificate_utils import (
parse_subject_alternative_name_extension,
get_common_names,
)
from sslyze.plugins.certificate_info._certificate_utils import parse_subject_alternative_name_extension, \
get_common_names

from checks import categories, scoring
from checks.http_client import http_get_ip
Expand Down Expand Up @@ -69,6 +73,7 @@
results_per_domain,
)
from interface import batch, batch_shared_task, redis_id

# Workaround for https://github.com/eventlet/eventlet/issues/413 for eventlet
# while monkey patching. That way we can still catch subprocess.TimeoutExpired
# instead of just Exception which may intervene with Celery's own exceptions.
Expand Down Expand Up @@ -960,7 +965,17 @@ def build_summary_report(testtls, category):
testtls.report = report


def dane(url: str, port: int, chain: List[Certificate], task, dane_cb_data, score_none, score_none_bogus, score_failed, score_validated):
def dane(
url: str,
port: int,
chain: List[Certificate],
task,
dane_cb_data,
score_none,
score_none_bogus,
score_failed,
score_validated,
):
"""
Check if there are TLSA records, if they are valid and if a DANE rollover
scheme is currently in place.
Expand Down Expand Up @@ -1116,6 +1131,7 @@ def get_common_name(cert):
pass
return value


def do_web_cert(af_ip_pairs, url, task, *args, **kwargs):
"""
Check the web server's certificate.
Expand Down Expand Up @@ -1151,8 +1167,9 @@ def cert_checks(url, mode, task, af_ip_pair=None, dane_cb_data=None, *args, **kw
port = 25
scan = ServerScanRequest(
server_location=ServerNetworkLocation(hostname=url, port=port),
network_configuration=ServerNetworkConfiguration(tls_server_name_indication=url,
tls_opportunistic_encryption=ProtocolWithOpportunisticTlsEnum.SMTP),
network_configuration=ServerNetworkConfiguration(
tls_server_name_indication=url, tls_opportunistic_encryption=ProtocolWithOpportunisticTlsEnum.SMTP
),
scan_commands={ScanCommand.CERTIFICATE_INFO},
)
else:
Expand Down Expand Up @@ -1218,7 +1235,6 @@ def cert_checks(url, mode, task, af_ip_pair=None, dane_cb_data=None, *args, **kw
}
hostmatch_bad = certificate_names


pubkey_score, pubkey_bad, pubkey_phase_out = check_pubkey(cert_deployment.received_certificate_chain)

# NCSC guideline B3-2
Expand All @@ -1230,13 +1246,13 @@ def cert_checks(url, mode, task, af_ip_pair=None, dane_cb_data=None, *args, **kw
sigalg = cert.signature_algorithm_oid
# Check oids
if sigalg not in (
SignatureAlgorithmOID.RSA_WITH_SHA256,
SignatureAlgorithmOID.RSA_WITH_SHA384,
SignatureAlgorithmOID.RSA_WITH_SHA512,
SignatureAlgorithmOID.ECDSA_WITH_SHA256,
SignatureAlgorithmOID.ECDSA_WITH_SHA384,
SignatureAlgorithmOID.ECDSA_WITH_SHA512,
SignatureAlgorithmOID.DSA_WITH_SHA256,
SignatureAlgorithmOID.RSA_WITH_SHA256,
SignatureAlgorithmOID.RSA_WITH_SHA384,
SignatureAlgorithmOID.RSA_WITH_SHA512,
SignatureAlgorithmOID.ECDSA_WITH_SHA256,
SignatureAlgorithmOID.ECDSA_WITH_SHA384,
SignatureAlgorithmOID.ECDSA_WITH_SHA512,
SignatureAlgorithmOID.DSA_WITH_SHA256,
):
sigalg_bad[get_common_name(cert)] = sigalg._name
sigalg_score = scoring.WEB_TLS_SIGNATURE_BAD
Expand All @@ -1245,11 +1261,17 @@ def cert_checks(url, mode, task, af_ip_pair=None, dane_cb_data=None, *args, **kw
for cert in cert_deployment.received_certificate_chain:
chain_str.append(get_common_name(cert))

dane_results = dane(url, port, cert_deployment.received_certificate_chain, task,
dane_cb_data, scoring.WEB_TLS_DANE_NONE,
scoring.WEB_TLS_DANE_NONE_BOGUS,
scoring.WEB_TLS_DANE_FAILED,
scoring.WEB_TLS_DANE_VALIDATED)
dane_results = dane(
url,
port,
cert_deployment.received_certificate_chain,
task,
dane_cb_data,
scoring.WEB_TLS_DANE_NONE,
scoring.WEB_TLS_DANE_NONE_BOGUS,
scoring.WEB_TLS_DANE_FAILED,
scoring.WEB_TLS_DANE_VALIDATED,
)

results = dict(
tls_cert=True,
Expand Down Expand Up @@ -1288,11 +1310,13 @@ def check_pubkey(certificates: List[Certificate]):
elif public_key_type is dsa.DSAPublicKey and bits < 2048:
failed_key_type = public_key_type.__name__
# TODO: DH type?
#elif public_key_type is DHPublicKey and bits < 2048:
# elif public_key_type is DHPublicKey and bits < 2048:
# failed_key_type = "DHPublicKey"
elif public_key_type in [x25519.X25519PublicKey, x448.X448PublicKey] and bits < 224:
failed_key_type = public_key_type.__name__
elif public_key_type is EllipticCurvePublicKey and (bits < 224 or public_key.curve not in [ec.SECP384R1, ec.SECP256R1]):
elif public_key_type is EllipticCurvePublicKey and (
bits < 224 or public_key.curve not in [ec.SECP384R1, ec.SECP256R1]
):
failed_key_type = public_key_type.__name__
if failed_key_type:
message = f"{common_name}: {failed_key_type}-{bits} bits"
Expand All @@ -1305,6 +1329,7 @@ def check_pubkey(certificates: List[Certificate]):
pubkey_score = scoring.WEB_TLS_PUBKEY_BAD
return pubkey_score, bad_pubkey, phase_out_pubkey


def do_web_conn(af_ip_pairs, url, *args, **kwargs):
"""
Start all the TLS related checks for the web test.
Expand Down Expand Up @@ -1386,7 +1411,9 @@ def check_mail_tls(server, dane_cb_data, task):
scans = [
ServerScanRequest(
server_location=ServerNetworkLocation(hostname=server, port=25),
network_configuration=ServerNetworkConfiguration(tls_server_name_indication=server, tls_opportunistic_encryption=ProtocolWithOpportunisticTlsEnum.SMTP),
network_configuration=ServerNetworkConfiguration(
tls_server_name_indication=server, tls_opportunistic_encryption=ProtocolWithOpportunisticTlsEnum.SMTP
),
scan_commands={
# ScanCommand.CERTIFICATE_INFO,
ScanCommand.SSL_2_0_CIPHER_SUITES,
Expand Down Expand Up @@ -1500,7 +1527,6 @@ def has_daneTA(tlsa_records):
return False



def check_web_tls(url, af_ip_pair=None, *args, **kwargs):
"""
Check the webserver's TLS configuration.
Expand Down

0 comments on commit 6a8265d

Please sign in to comment.