-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathanalyze_crawl.py
255 lines (225 loc) · 11.1 KB
/
analyze_crawl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
from __future__ import division
import os
import sys
import sqlite3
from time import time
from os.path import join, basename, sep, isdir
from collections import defaultdict
import util
from db_schema import (HTTP_REQUESTS_TABLE,
HTTP_RESPONSES_TABLE,
JAVASCRIPT_TABLE, OPENWPM_TABLES)
from util import dump_as_json, get_table_and_column_names, get_crawl_dir,\
get_crawl_db_path
class CrawlDBAnalysis(object):
def __init__(self, crawl_dir, out_dir):
self.crawl_dir = get_crawl_dir(crawl_dir)
self.crawl_name = basename(crawl_dir.rstrip(sep))
self.crawl_db_path = get_crawl_db_path(self.crawl_dir)
self.command_fail_rate = {}
self.command_timeout_rate = {}
self.init_db()
self.out_dir = join(out_dir, "analysis")
self.init_out_dir()
self.visit_id_site_urls = self.get_visit_id_site_url_mapping()
self.sv_num_requests = defaultdict(int)
self.sv_num_responses = defaultdict(int)
self.sv_num_javascript = defaultdict(int)
self.sv_num_third_parties = defaultdict(int)
self.num_entries_without_visit_id = defaultdict(int)
self.num_entries = defaultdict(int)
self.sv_third_parties = defaultdict(set)
self.tp_to_publishers = defaultdict(set)
self.rows_without_visit_id = 0
def init_db(self):
self.db_conn = sqlite3.connect(self.crawl_db_path)
self.db_conn.row_factory = sqlite3.Row
self.optimize_db()
def init_out_dir(self):
if not isdir(self.out_dir):
os.makedirs(self.out_dir)
def optimize_db(self, size_in_gb=20):
""" Runs PRAGMA queries to make sqlite better """
self.db_conn.execute("PRAGMA cache_size = -%i" % (size_in_gb * 10**6))
# Store temp tables, indices in memory
self.db_conn.execute("PRAGMA temp_store = 2")
# self.db_conn.execute("PRAGMA synchronous = NORMAL;")
self.db_conn.execute("PRAGMA synchronous = OFF;")
# self.db_conn.execute("PRAGMA journal_mode = WAL;")
self.db_conn.execute("PRAGMA journal_mode = OFF;")
self.db_conn.execute("PRAGMA page_size = 32768;")
def run_all_streaming_analysis(self):
self.run_streaming_analysis_for_table(HTTP_REQUESTS_TABLE)
self.run_streaming_analysis_for_table(HTTP_RESPONSES_TABLE)
self.run_streaming_analysis_for_table(JAVASCRIPT_TABLE)
def get_visit_id_site_url_mapping(self):
visit_id_site_urls = {}
for visit_id, site_url in self.db_conn.execute(
"SELECT visit_id, site_url FROM site_visits"):
visit_id_site_urls[visit_id] = site_url
print len(visit_id_site_urls), "mappings"
print "Distinct site urls", len(set(visit_id_site_urls.values()))
return visit_id_site_urls
def run_streaming_analysis_for_table(self, table_name):
current_visit_ids = {}
processed = 0
cols_to_select = ["visit_id", "crawl_id"]
print "Will analyze %s" % table_name
if table_name == HTTP_REQUESTS_TABLE:
cols_to_select.append("url")
# check whether top_level_url is here
# ultimately preprocesing will make sure all tables contain
# top_level_url
try:
self.db_conn.execute("SELECT top_level_url FROM %s LIMIT 1" %
table_name)
cols_to_select.append("top_level_url")
except Exception:
pass
query = "SELECT %s FROM %s" % (",".join(cols_to_select), table_name)
for row in self.db_conn.execute(query):
processed += 1
visit_id = int(row["visit_id"])
crawl_id = int(row["crawl_id"])
if visit_id == -1:
self.rows_without_visit_id += 1
continue
site_url = self.visit_id_site_urls[visit_id]
if table_name == HTTP_REQUESTS_TABLE:
# use top_level_url, otherwise fall back to top_url
self.sv_num_requests[site_url] += 1
top_url = None
if "top_level_url" in row:
top_url = row["top_level_url"]
if top_url is None:
top_url = self.visit_id_site_urls[visit_id]
if top_url:
is_tp, req_ps1, _ = util.is_third_party(
row["url"], top_url)
if is_tp:
self.sv_third_parties[site_url].add(req_ps1)
self.sv_num_third_parties[site_url] = len(
self.sv_third_parties[site_url])
self.tp_to_publishers[req_ps1].add(site_url)
else:
print "Warning, missing top_url", row
elif table_name == HTTP_RESPONSES_TABLE:
self.sv_num_responses[site_url] += 1
elif table_name == JAVASCRIPT_TABLE:
self.sv_num_javascript[site_url] += 1
if crawl_id not in current_visit_ids:
current_visit_ids[crawl_id] = visit_id
# end of the data from the current visit
elif visit_id > current_visit_ids[crawl_id]:
# self.process_visit_data(current_visit_data[crawl_id])
# if site_url in self.sv_third_parties:
# del self.sv_third_parties[site_url]
current_visit_ids[crawl_id] = visit_id
elif visit_id < current_visit_ids[crawl_id] and visit_id > 0:
# raise Exception(
# "Out of order row! Curr: %s Row: %s Crawl id: %s" %
# (current_visit_ids[crawl_id], visit_id, crawl_id))
print ("Warning: Out of order row! Curr: %s Row: %s"
" Crawl id: %s" % (
current_visit_ids[crawl_id], visit_id, crawl_id))
self.dump_crawl_data(table_name)
def print_num_of_rows(self):
print "Will print the number of rows"
db_schema_str = get_table_and_column_names(self.crawl_db_path)
for table_name in OPENWPM_TABLES:
# TODO: search in table names instead of the db schema
if table_name in db_schema_str:
try:
num_rows = self.db_conn.execute(
"SELECT MAX(id) FROM %s" % table_name).fetchone()[0]
except sqlite3.OperationalError:
num_rows = self.db_conn.execute(
"SELECT COUNT(*) FROM %s" % table_name).fetchone()[0]
if num_rows is None:
num_rows = 0
print "Total rows", table_name, num_rows
def dump_crawl_data(self, table_name):
if table_name == HTTP_REQUESTS_TABLE:
self.dump_json(self.sv_num_requests, "sv_num_requests.json")
self.dump_json(self.sv_num_third_parties,
"sv_num_third_parties.json")
# self.dump_json(self.sv_third_parties, "sv_third_parties.json")
tp_to_publishers = {tp: "\t".join(publishers) for (tp, publishers)
in self.tp_to_publishers.iteritems()}
self.dump_json(tp_to_publishers, "tp_to_publishers.json")
elif table_name == HTTP_RESPONSES_TABLE:
self.dump_json(self.sv_num_responses, "sv_num_responses.json")
elif table_name == JAVASCRIPT_TABLE:
self.dump_json(self.sv_num_javascript, "sv_num_javascript.json")
def dump_json(self, obj, out_file):
dump_as_json(obj, join(self.out_dir, "%s_%s" % (self.crawl_name,
out_file)))
def start_analysis(self):
self.print_num_of_rows()
self.check_crawl_history()
self.run_all_streaming_analysis()
self.dump_entries_without_visit_ids()
def get_num_entries_without_visit_id(self, table_name):
query = "SELECT count(*) FROM %s WHERE visit_id = -1;" % table_name
try:
return self.db_conn.execute(query).fetchone()[0]
except Exception:
return 0
def get_num_entries(self, table_name):
query = "SELECT count(*) FROM %s;" % table_name
return self.db_conn.execute(query).fetchone()[0]
def dump_entries_without_visit_ids(self):
"""All these metrics can be computed during the streaming analysis."""
self.num_entries[HTTP_REQUESTS_TABLE] = self.get_num_entries(
HTTP_REQUESTS_TABLE)
self.num_entries_without_visit_id[HTTP_REQUESTS_TABLE] = \
self.get_num_entries_without_visit_id(HTTP_REQUESTS_TABLE)
self.num_entries[HTTP_RESPONSES_TABLE] = self.get_num_entries(
HTTP_RESPONSES_TABLE)
self.num_entries_without_visit_id[HTTP_RESPONSES_TABLE] =\
self.get_num_entries_without_visit_id(HTTP_RESPONSES_TABLE)
self.num_entries[JAVASCRIPT_TABLE] = self.get_num_entries(
JAVASCRIPT_TABLE)
self.num_entries_without_visit_id[JAVASCRIPT_TABLE] =\
self.get_num_entries_without_visit_id(JAVASCRIPT_TABLE)
self.dump_json(self.num_entries_without_visit_id,
"entries_without_visit_id.json")
self.dump_json(self.num_entries, "num_entries.json")
def check_crawl_history(self):
"""Compute failure and timeout rates for crawl_history table."""
command_counts = {} # num. of total commands by type
fails = {} # num. of failed commands grouped by cmd type
timeouts = {} # num. of timeouts
for row in self.db_conn.execute(
"""SELECT command, count(*)
FROM crawl_history
GROUP BY command;""").fetchall():
command_counts[row["command"]] = row["count(*)"]
print "crawl_history Totals", row["command"], row["count(*)"]
for row in self.db_conn.execute(
"""SELECT command, count(*)
FROM crawl_history
WHERE bool_success = 0
GROUP BY command;""").fetchall():
fails[row["command"]] = row["count(*)"]
print "crawl_history Fails", row["command"], row["count(*)"]
for row in self.db_conn.execute(
"""SELECT command, count(*)
FROM crawl_history
WHERE bool_success = -1
GROUP BY command;""").fetchall():
timeouts[row["command"]] = row["count(*)"]
print "crawl_history Timeouts", row["command"], row["count(*)"]
for command in command_counts.keys():
self.command_fail_rate[command] = (fails.get(command, 0) /
command_counts[command])
self.command_timeout_rate[command] = (timeouts.get(command, 0) /
command_counts[command])
self.dump_json(self.command_fail_rate, "command_fail_rate.json")
self.dump_json(self.command_timeout_rate,
"command_timeout_rate.json")
if __name__ == '__main__':
t0 = time()
crawl_db_check = CrawlDBAnalysis(sys.argv[1], sys.argv[2])
crawl_db_check.start_analysis()
print "Analysis finished in %0.1f mins" % ((time() - t0) / 60)