Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

class-based rewrite #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 166 additions & 142 deletions SPFlatten.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#!/usr/bin/env python3

import dns.resolver
import argparse
import logging
import re
import sys
import dns.resolver

# -----------------------------------------------------
# https://github.com/0x9090/SPFlatten
# SPFlattener - Because who needs limits??
# Requires: dnspython
# Usage: SPFlatten.py yourdmomain.com and-optional-others.net etc.org
Expand All @@ -13,157 +15,179 @@
# Confirm that SPF doesn't follow CNAMES (I don't think it does)
# Should we consider Sender ID? ie spf2.0 (probably not)

# ---------------------------------
if len(sys.argv) <= 1:
sys.exit(1)

isDebug = 0
root_domain = ''
all_mechanism = ''
spf_nonflat_mechanisms = []
spf_ip_list = []

root_domains = sys.argv[1:]


def debug(*args, **kwargs):
global isDebug
if isDebug:
print(*args, **kwargs)


# ---------------------------------

def main():
global all_mechanism
global root_domain
global spf_nonflat_mechanisms
global spf_ip_list

for root_domain in root_domains:
all_mechanism = ''
spf_nonflat_mechanisms = []
spf_ip_list = []

flatten_spf(root_domain)

dedupe_spf_ip_list = list(set(spf_ip_list))

logger = logging.getLogger('spflat')

class Flattener:
def __init__(self, domain=None, root_domain=None):
self.all_mechanism = ''
self.spf_nonflat_mechanisms = []
self.spf_ip_list = []
self.domain = domain
self.root_domain = root_domain

def dump(self):
'''
Deduplicate (possibly nested) records
'''
dedupe_spf_ip_list = list(set(self.spf_ip_list))
flat_spf = "v=spf1"
for ip in dedupe_spf_ip_list:
if re.match(r'.*:.*', ip):
flat_spf += (" ip6:" + ip)
else:
flat_spf += (" ip4:" + ip)

for mechanism in spf_nonflat_mechanisms:
for mechanism in self.spf_nonflat_mechanisms:
flat_spf += " " + mechanism
flat_spf += self.all_mechanism

logger.info("%s flattened SPF", self.domain)
print(flat_spf)

def flatten_domain(self):
logger.debug("Flattening: %s", self.domain)
try:
txt_records = dns.resolver.resolve(self.domain, "TXT")
except dns.exception.DNSException:
logger.debug("No TXT records for: %s", self.domain)
return

for record in txt_records:
logger.debug("TXT record for: %s:%s", self.domain, str(record))
joinrecord = ''.join([x for x in str(record).split('"') if x.strip()])
fields = joinrecord.split(' ')

if re.match(r'v=spf1', fields[0]):
self.flatten_record(joinrecord)

# Recursively flatten the SPF record for the specified domain
def flatten_record(self, record):
for field in record.split(' '):
self.parse_mechanism(field)

# Parse the given mechansim, and dispatch it accordintly
def parse_mechanism(self, mechanism):
if re.match(r'^a$', mechanism):
self.spf_ip_list.extend(Flattener.convert_domain_to_ipv4(self.domain))
elif re.match(r'^mx$', mechanism):
logger.debug("MX found for %s:%s", self.domain, mechanism)
self.spf_ip_list.extend(Flattener.convert_mx_to_ipv4(self.domain))
elif re.match(r'^a:.*$', mechanism):
match = re.match(r'^a:(.*)$', mechanism)
self.spf_ip_list.extend(Flattener.convert_domain_to_ipv4(match.group(1)))
elif re.match(r'^ip4:.*$', mechanism):
match = re.match(r'^ip4:(.*)$', mechanism)
logger.debug("IPv4 address found for %s:%s", self.domain, match.group(1))
self.spf_ip_list.append(match.group(1))
elif re.match(r'^ip6:.*$', mechanism):
match = re.match(r'^ip6:(.*)$', mechanism)
logger.debug("IPv6 address found for %s:%s", self.domain, match.group(1))
self.spf_ip_list.append(match.group(1))
elif re.match(r'^ptr.*$', mechanism):
logger.debug("PTR found for %s:%s", self.domain, mechanism)
self.spf_nonflat_mechanisms.append(mechanism)
elif re.match(r'^exists:$', mechanism):
logger.debug("Exists found for %s:%s", self.domain, mechanism)
self.spf_nonflat_mechanisms.append(mechanism)
elif re.match(r'^redirect(?:[=:]) ?(.*)$', mechanism):
logger.debug("Redirect found for %s:%s", self.domain, mechanism)
match = re.match(r'^redirect(?:[=:]) ?(.*)', mechanism)
newdom = Flattener(match.group(1), self.root_domain)
newdom.flatten_domain() # recursion
self.spf_nonflat_mechanisms.extend(newdom.spf_nonflat_mechanisms)
self.spf_ip_list.extend(newdom.spf_ip_list)
#
# self.spf_nonflat_mechanisms.append(mechanism)
# flatten_domain(re.match(r'^redirect(?:[\=\:])\ ?(.*)$', mechanism).group(1))
elif re.match(r'^exp:$', mechanism):
logger.debug("EXP found for %s:%s", self.domain, mechanism)
self.spf_nonflat_mechanisms.append(mechanism)
elif re.match(r'^.all$', mechanism):
if self.domain == self.root_domain or self.all_mechanism == '':
match = re.match(r'^(.all)$', mechanism)
logger.debug("All found for %s:%s", self.domain, match.group(1))
self.all_mechanism = " " + str(match.group(1))
elif re.match(r'^include:.*$', mechanism):
match = re.match(r'^include:(.*)', mechanism)
newdom = Flattener(match.group(1), self.root_domain)
newdom.flatten_domain() # recursion
self.spf_nonflat_mechanisms.extend(newdom.spf_nonflat_mechanisms)
self.spf_ip_list.extend(newdom.spf_ip_list)


# Convert A/AAAA records to IPs and adds them to the SPF master list
@staticmethod
def convert_domain_to_ipv4(domain):
if not domain:
logger.warning("Can't resolve \"a\" or \"mx\" mechanism without specifying a domain. Results will be partial")
return []

spf_ip_list = []
try:
a_records = dns.resolver.resolve(domain, "A")
for ip in a_records:
logger.debug("A record for %s:%s", domain, str(ip))
spf_ip_list.append(str(ip))
except dns.exception.DNSException:
pass

try:
aaaa_records = dns.resolver.resolve(domain, "AAAA")
for ip in aaaa_records:
logger.debug("A record for %s:%s", domain, str(ip))
spf_ip_list.append(str(ip))
except dns.exception.DNSException:
pass

return spf_ip_list

# Convert MX records to IPs and adds them to the SPF master list
@staticmethod
def convert_mx_to_ipv4(domain):
try:
mx_records = dns.resolver.resolve(domain, "MX")
except dns.exception.DNSException:
import pdb
pdb.set_trace()
return []

spf_ip_list = []
for record in mx_records:
mx = str(record).split(' ')
logger.debug("MX record found for %s:%s ", domain, mx[1])
spf_ip_list.extend(Flattener.convert_domain_to_ipv4(mx[1]))

flat_spf += all_mechanism

print("#### Flattened SPF for %s ####\n----------------------\n%s\n" % (root_domain, flat_spf))


# Recursively flatten the SPF record for the specified domain
def flatten_spf(domain):
global all_mechanism

debug("--- Flattening:", domain, "---")
try:
txt_records = dns.resolver.query(domain, "TXT")
except dns.exception.DNSException:
debug("No TXT records for:", domain)
return

for record in txt_records:
debug("TXT record for:", domain, ":", str(record))
joinrecord = ''.join([x for x in str(record).split('"') if x.strip()])
fields = joinrecord.split(' ')

if re.match(r'v=spf1', fields[0]):
for field in fields:
parse_mechanism(field, domain)


# Parse the given mechansim, and dispatch it accordintly
def parse_mechanism(mechanism, domain):
global all_mechanism

if re.match(r'^a$', mechanism):
convert_domain_to_ipv4(domain)
elif re.match(r'^mx$', mechanism):
debug("MX found for", root_domain, ":", mechanism)
convert_mx_to_ipv4(root_domain)
elif re.match(r'^a:.*$', mechanism):
match = re.match(r'^a:(.*)$', mechanism)
convert_domain_to_ipv4(match.group(1))
elif re.match(r'^ip4:.*$', mechanism):
match = re.match(r'^ip4:(.*)$', mechanism)
debug("IPv4 address found for", domain, ":", match.group(1))
spf_ip_list.append(match.group(1))
elif re.match(r'^ip6:.*$', mechanism):
match = re.match(r'^ip6:(.*)$', mechanism)
debug("IPv6 address found for", domain, ":", match.group(1))
spf_ip_list.append(match.group(1))
elif re.match(r'^ptr.*$', mechanism):
debug("PTR found for", domain, ":", mechanism)
spf_nonflat_mechanisms.append(mechanism)
elif re.match(r'^exists:$', mechanism):
debug("Exists found for", domain, ":", mechanism)
spf_nonflat_mechanisms.append(mechanism)
elif re.match(r'^redirect(?:[=:]) ?(.*)$', mechanism):
debug("Redirect found for", domain, ":", mechanism)
match = re.match(r'^redirect(?:[=:]) ?(.*)', mechanism)
flatten_spf(match.group(1)) # recursion
#
# spf_nonflat_mechanisms.append(mechanism)
# flatten_spf(re.match(r'^redirect(?:[\=\:])\ ?(.*)$', mechanism).group(1))
elif re.match(r'^exp:$', mechanism):
debug("EXP found for", domain, ":", mechanism)
spf_nonflat_mechanisms.append(mechanism)
elif re.match(r'^.all$', mechanism):
if domain == root_domain or all_mechanism == '':
match = re.match(r'^(.all)$', mechanism)
debug("All found for", domain, ":", match.group(1))
all_mechanism = " " + str(match.group(1))
elif re.match(r'^include:.*$', mechanism):
match = re.match(r'^include:(.*)', mechanism)
flatten_spf(match.group(1)) # recursion


# Convert A/AAAA records to IPs and adds them to the SPF master list
def convert_domain_to_ipv4(domain):
try:
a_records = dns.resolver.query(domain, "A")
for ip in a_records:
debug("A record for", domain, ":", str(ip))
spf_ip_list.append(str(ip))
except dns.exception.DNSException:
pass

try:
aaaa_records = dns.resolver.query(domain, "AAAA")
for ip in aaaa_records:
debug("A record for", domain, ":", str(ip))
spf_ip_list.append(str(ip))
except dns.exception.DNSException:
pass


# Convert MX records to IPs and adds them to the SPF master list
def convert_mx_to_ipv4(domain):
try:
mx_records = dns.resolver.query(domain, "MX")
except dns.exception.DNSException:
import pdb
pdb.set_trace()
return

for record in mx_records:
mx = str(record).split(' ')
debug("MX record found for ", domain, ": ", mx[1])
convert_domain_to_ipv4(mx[1])
return spf_ip_list


if __name__ == "__main__":
main()
parser = argparse.ArgumentParser(
description="Flatten SPF records",
formatter_class=argparse.RawTextHelpFormatter,
epilog='foo')
parser.add_argument('values', nargs='+',
help='One or more domains from which SPF will be fetched and flattened.')
parser.add_argument('-i', '--inlined', action='store_true',
help='If set, the argument is expect to be an inlined SPF records.')
parser.add_argument('-d', '--domain',
help='With -i, specify the domain in order to resolve "a" or "mx" records.')
parser.add_argument('-v', '--verbose', action='count', default=0,
help='More verbose')

args = parser.parse_args()

levels = [logging.WARNING, logging.INFO, logging.DEBUG]
level = levels[min(len(levels)-1, args.verbose)]
logging.basicConfig(level=level)

if args.inlined:
f = Flattener(args.domain)
f.flatten_record(args.values[0])
f.dump()
else:
for dom in args.values:
f = Flattener(dom)
f.flatten_domain()
f.dump()