-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDnsListener.py
92 lines (79 loc) · 3.32 KB
/
DnsListener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import dns.message
import dns.query
import dns.rcode
import dns.resolver
import socketserver
import threading
import datetime
# DNS server settings
LISTEN_ADDRESS = '0.0.0.0' # Address to listen on (all network interfaces)
LISTEN_PORT = 53 # Port to listen on
UPSTREAM_DNS = '8.8.8.8' # Upstream DNS server to forward valid requests
# Blacklisted domains
BLACKLIST = ['youtube', 'youtubekids', 'youtubei']
# Whitelisted IP addresses
WHITELIST = ['192.168.1.2', '192.168.1.3']
# Block period (from 4PM to 8AM next day)
BLOCK_START_TIME = datetime.time(13, 0) # 1:00 PM
BLOCK_END_TIME = datetime.time(8, 0) # 8:00 AM
def log_request(domain, ip):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"{timestamp} - Domain: {domain}, IP: {ip}\n"
with open("dns_log.txt", "a") as log_file:
log_file.write(log_entry)
def is_block_period(client_ip):
current_time = datetime.datetime.now().time()
# Check if the client IP is whitelisted
if client_ip in WHITELIST:
return False
# Check if it's within the block period
return BLOCK_START_TIME <= current_time or current_time < BLOCK_END_TIME
class DNSRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
# Get the client IP address
client_ip = self.client_address[0]
query = dns.message.from_wire(self.request[0])
domain = str(query.question[0].name)
# Handle the DNS request only if it's not within the block period
if not is_block_period(client_ip):
# Print the domain being requested
print(datetime.datetime.now(), " : ", client_ip, " : ", domain,)
# Log the request to a file
log_request(domain, client_ip)
# Create a response message
response = dns.message.make_response(query)
# Check if the domain is in the blacklist
if any(word in domain for word in BLACKLIST) and client_ip not in WHITELIST:
print("BLACKLISTED")
# Set the response code to indicate non-existent domain
response.set_rcode(dns.rcode.NXDOMAIN)
# Set an empty answer section to indicate "Not found" response
response.answer = []
else:
# Forward the request to the upstream DNS server
response = dns.query.tcp(query, UPSTREAM_DNS)
# Send the DNS response back to the client
self.request[1].sendto(response.to_wire(), self.client_address)
else:
print(datetime.datetime.now(), " : ", client_ip, " : ", domain, " Time Block")
# Create a DNS server instance
dns_server = socketserver.ThreadingUDPServer((LISTEN_ADDRESS, LISTEN_PORT), DNSRequestHandler)
# Start the DNS server in a separate thread
dns_server_thread = threading.Thread(target=dns_server.serve_forever)
dns_server_thread.start()
print(f'DNS server started on {LISTEN_ADDRESS}:{LISTEN_PORT}')
print(f'Upstream DNS server: {UPSTREAM_DNS}')
print('Blacklisted domains:')
for domain in BLACKLIST:
print(f'- {domain}')
print('Whitelisted IP addresses:')
for ip in WHITELIST:
print(f'- {ip}')
# Keep the main thread running until interrupted
try:
while True:
pass
except KeyboardInterrupt:
# Stop the DNS server and join the thread
dns_server.shutdown()
dns_server_thread.join()