13
13
from celery import shared_task
14
14
from celery .exceptions import SoftTimeLimitExceeded
15
15
from celery .utils .log import get_task_logger
16
- from cryptography .hazmat .backends import default_backend
17
16
from cryptography .hazmat .backends .openssl import rsa
18
17
from cryptography .hazmat .primitives import hashes
18
+ from cryptography .hazmat .primitives .serialization import Encoding
19
19
from cryptography .hazmat .primitives .asymmetric import dsa , x25519 , x448 , ec
20
20
from cryptography .hazmat .primitives .asymmetric .ec import EllipticCurvePublicKey
21
21
from cryptography .x509 import (
@@ -1034,7 +1034,7 @@ def build_summary_report(testtls, category):
1034
1034
testtls .report = report
1035
1035
1036
1036
1037
- def dane (url , port , chain , task , dane_cb_data , score_none , score_none_bogus , score_failed , score_validated ):
1037
+ def dane (url : str , port : int , chain : List [ Certificate ] , task , dane_cb_data , score_none , score_none_bogus , score_failed , score_validated ):
1038
1038
"""
1039
1039
Check if there are TLSA records, if they are valid and if a DANE rollover
1040
1040
scheme is currently in place.
@@ -1108,9 +1108,8 @@ def dane(url, port, chain, task, dane_cb_data, score_none, score_none_bogus, sco
1108
1108
1109
1109
chain_pem = []
1110
1110
for cert in chain :
1111
- chain_pem .append (cert .as_pem ( ))
1111
+ chain_pem .append (cert .public_bytes ( Encoding . PEM ). decode ( "ascii" ))
1112
1112
chain_txt = "\n " .join (chain_pem )
1113
- res = None
1114
1113
with subprocess .Popen (
1115
1114
[
1116
1115
settings .LDNS_DANE ,
@@ -1191,74 +1190,6 @@ def get_common_name(cert):
1191
1190
pass
1192
1191
return value
1193
1192
1194
-
1195
- class DebugCertChain :
1196
- """
1197
- Class performing X509 cert checks NCSC Guidelines B3-*
1198
-
1199
- """
1200
-
1201
- def __new__ (cls , chain ):
1202
- """
1203
- In case the chain is None (ValueError from nassl) don't create an
1204
- instance. Instead return None and it will be handled during the
1205
- certificate checks.
1206
-
1207
- """
1208
- if chain is None :
1209
- return None
1210
- return super ().__new__ (cls )
1211
-
1212
- def __init__ (self , chain ):
1213
- self .unparsed_chain = chain
1214
- self .chain = [
1215
- load_pem_x509_certificate (cert .as_pem ().encode ("ascii" ), backend = default_backend ()) for cert in chain
1216
- ]
1217
- self .score_hostmatch_good = scoring .WEB_TLS_HOSTMATCH_GOOD
1218
- self .score_hostmatch_bad = scoring .WEB_TLS_HOSTMATCH_BAD
1219
- self .score_pubkey_good = scoring .WEB_TLS_PUBKEY_GOOD
1220
- self .score_pubkey_bad = scoring .WEB_TLS_PUBKEY_BAD
1221
- self .score_signature_good = scoring .WEB_TLS_SIGNATURE_GOOD
1222
- self .score_signature_bad = scoring .WEB_TLS_SIGNATURE_BAD
1223
- self .score_dane_none = scoring .WEB_TLS_DANE_NONE
1224
- self .score_dane_none_bogus = scoring .WEB_TLS_DANE_NONE_BOGUS
1225
- self .score_dane_failed = scoring .WEB_TLS_DANE_FAILED
1226
- self .score_dane_validated = scoring .WEB_TLS_DANE_VALIDATED
1227
-
1228
- def check_dane (self , url , port , task , dane_cb_data = None ):
1229
- return dane (
1230
- url ,
1231
- port ,
1232
- self .unparsed_chain ,
1233
- task ,
1234
- dane_cb_data ,
1235
- self .score_dane_none ,
1236
- self .score_dane_none_bogus ,
1237
- self .score_dane_failed ,
1238
- self .score_dane_validated ,
1239
- )
1240
-
1241
-
1242
- class DebugCertChainMail (DebugCertChain ):
1243
- """
1244
- Subclass of DebugCertChain to define the scores used for the mailtest.
1245
-
1246
- """
1247
-
1248
- def __init__ (self , chain ):
1249
- super ().__init__ (chain )
1250
- self .score_hostmatch_good = scoring .MAIL_TLS_HOSTMATCH_GOOD
1251
- self .score_hostmatch_bad = scoring .MAIL_TLS_HOSTMATCH_BAD
1252
- self .score_pubkey_good = scoring .MAIL_TLS_PUBKEY_GOOD
1253
- self .score_pubkey_bad = scoring .MAIL_TLS_PUBKEY_BAD
1254
- self .score_signature_good = scoring .MAIL_TLS_SIGNATURE_GOOD
1255
- self .score_signature_bad = scoring .MAIL_TLS_SIGNATURE_BAD
1256
- self .score_dane_none = scoring .MAIL_TLS_DANE_NONE
1257
- self .score_dane_none_bogus = scoring .MAIL_TLS_DANE_NONE_BOGUS
1258
- self .score_dane_failed = scoring .MAIL_TLS_DANE_FAILED
1259
- self .score_dane_validated = scoring .MAIL_TLS_DANE_VALIDATED
1260
-
1261
-
1262
1193
def do_web_cert (af_ip_pairs , url , task , * args , ** kwargs ):
1263
1194
"""
1264
1195
Check the web server's certificate.
@@ -1282,6 +1213,11 @@ def cert_checks(url, mode, task, af_ip_pair=None, dane_cb_data=None, *args, **kw
1282
1213
Perform certificate checks.
1283
1214
1284
1215
"""
1216
+ # TODO: common property?
1217
+ ports = {
1218
+ ChecksMode .WEB : 443 ,
1219
+ ChecksMode .MAIL : 25 ,
1220
+ }
1285
1221
# TODO: this does use our trust store
1286
1222
if mode == ChecksMode .WEB :
1287
1223
print (f"starting sslyze scan for { url } { af_ip_pair [1 ]} { dane_cb_data } " )
@@ -1379,11 +1315,11 @@ def cert_checks(url, mode, task, af_ip_pair=None, dane_cb_data=None, *args, **kw
1379
1315
for cert in cert_deployment .received_certificate_chain :
1380
1316
chain_str .append (get_common_name (cert ))
1381
1317
1382
- # TODO: DANE
1383
- # if starttls_details:
1384
- # dane_results = debug_chain.check_dane(url, conn_port, task, dane_cb_data=starttls_details.dane_cb_data)
1385
- # else:
1386
- # dane_results = debug_chain.check_dane(url, conn_port, task )
1318
+ dane_results = dane ( url , ports [ mode ], cert_deployment . received_certificate_chain , task ,
1319
+ dane_cb_data , scoring . WEB_TLS_DANE_NONE ,
1320
+ scoring . WEB_TLS_DANE_NONE_BOGUS ,
1321
+ scoring . WEB_TLS_DANE_FAILED ,
1322
+ scoring . WEB_TLS_DANE_VALIDATED )
1387
1323
1388
1324
results = dict (
1389
1325
tls_cert = True ,
@@ -1398,7 +1334,7 @@ def cert_checks(url, mode, task, af_ip_pair=None, dane_cb_data=None, *args, **kw
1398
1334
hostmatch_bad = hostmatch_bad ,
1399
1335
hostmatch_score = hostmatch_score ,
1400
1336
)
1401
- # results.update(dane_results)
1337
+ results .update (dane_results )
1402
1338
1403
1339
return results
1404
1340
0 commit comments