Skip to content

Commit 32ec2b8

Browse files
committed
Print number of rows for each table
Rename CrawlHistory to crawl_history (Fixes #11) When assigning visit_ids follow the site_url order from the http_requests table.
1 parent a4a725b commit 32ec2b8

File tree

5 files changed

+118
-58
lines changed

5 files changed

+118
-58
lines changed

analyze_crawl.py

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
import os
55
from os.path import isfile, join
66
from collections import defaultdict
7-
from tqdm import tqdm
7+
# from tqdm import tqdm
88
import util
99
from db_schema import (HTTP_REQUESTS_TABLE,
1010
HTTP_RESPONSES_TABLE,
11-
JAVASCRIPT_TABLE)
12-
from util import dump_as_json
11+
JAVASCRIPT_TABLE, OPENWPM_TABLES)
12+
from util import dump_as_json, get_table_and_column_names
1313

1414

1515
class CrawlDBAnalysis(object):
@@ -44,21 +44,16 @@ def run_all_streaming_analysis(self):
4444

4545
def get_visit_id_site_url_mapping(self):
4646
visit_id_site_urls = {}
47-
from time import time
48-
t0 = time()
4947
for visit_id, site_url in self.db_conn.execute(
5048
"SELECT visit_id, site_url FROM site_visits"):
5149
visit_id_site_urls[visit_id] = site_url
52-
print len(visit_id_site_urls), "Mappings. Took %s s" % (time() - t0)
50+
print len(visit_id_site_urls), "mappings"
5351
print "Distinct site urls", len(set(visit_id_site_urls.values()))
5452
return visit_id_site_urls
5553

5654
def run_streaming_analysis_for_table(self, table_name):
5755
current_visit_ids = {}
5856
processed = 0
59-
num_rows = self.db_conn.execute(
60-
"SELECT MAX(id) FROM %s" % table_name).fetchone()[0]
61-
print "Total rows", num_rows, table_name
6257
cols_to_select = ["visit_id", "crawl_id"]
6358
if table_name == HTTP_REQUESTS_TABLE:
6459
cols_to_select.append("url")
@@ -73,7 +68,7 @@ def run_streaming_analysis_for_table(self, table_name):
7368
pass
7469

7570
query = "SELECT %s FROM %s" % (",".join(cols_to_select), table_name)
76-
for row in tqdm(self.db_conn.execute(query)):
71+
for row in self.db_conn.execute(query):
7772
processed += 1
7873
visit_id = int(row["visit_id"])
7974
crawl_id = int(row["crawl_id"])
@@ -111,15 +106,33 @@ def run_streaming_analysis_for_table(self, table_name):
111106
# end of the data from the current visit
112107
elif visit_id > current_visit_ids[crawl_id]:
113108
# self.process_visit_data(current_visit_data[crawl_id])
114-
if site_url in self.sv_third_parties:
115-
del self.sv_third_parties[site_url]
109+
# if site_url in self.sv_third_parties:
110+
# del self.sv_third_parties[site_url]
116111
current_visit_ids[crawl_id] = visit_id
117112
elif visit_id < current_visit_ids[crawl_id] and visit_id > 0:
118-
raise Exception(
119-
"Out of order row! Curr: %s Row: %s Crawl id: %s" %
120-
(current_visit_ids[crawl_id], visit_id, crawl_id))
113+
# raise Exception(
114+
# "Out of order row! Curr: %s Row: %s Crawl id: %s" %
115+
# (current_visit_ids[crawl_id], visit_id, crawl_id))
116+
print "Warning: Out of order row! Curr: %s Row: %s Crawl id: %s" % (current_visit_ids[crawl_id], visit_id, crawl_id)
117+
121118
self.dump_crawl_data(table_name)
122119

120+
def print_num_of_rows(self):
121+
print "Will print the number of rows"
122+
db_schema_str = get_table_and_column_names(self.db_path)
123+
for table_name in OPENWPM_TABLES:
124+
# TODO: search in table names instead of the db schema
125+
if table_name in db_schema_str:
126+
try:
127+
num_rows = self.db_conn.execute(
128+
"SELECT MAX(id) FROM %s" % table_name).fetchone()[0]
129+
except sqlite3.OperationalError:
130+
num_rows = self.db_conn.execute(
131+
"SELECT COUNT(*) FROM %s" % table_name).fetchone()[0]
132+
if num_rows is None:
133+
num_rows = 0
134+
print "Total rows", table_name, num_rows
135+
123136
def dump_crawl_data(self, table_name):
124137
if table_name == HTTP_REQUESTS_TABLE:
125138
self.dump_json(self.sv_num_requests, "sv_num_requests.json")
@@ -139,36 +152,37 @@ def dump_json(self, obj, out_file):
139152
out_file)))
140153

141154
def start_analysis(self):
155+
self.print_num_of_rows()
142156
self.check_crawl_history()
143157
self.run_all_streaming_analysis()
144158

145159
def check_crawl_history(self):
146-
"""Compute failure and timeout rates for CrawlHistory table."""
160+
"""Compute failure and timeout rates for crawl_history table."""
147161
command_counts = {} # num. of total commands by type
148162
fails = {} # num. of failed commands grouped by cmd type
149163
timeouts = {} # num. of timeouts
150164
for row in self.db_conn.execute(
151165
"""SELECT command, count(*)
152-
FROM CrawlHistory
166+
FROM crawl_history
153167
GROUP BY command;""").fetchall():
154168
command_counts[row["command"]] = row["count(*)"]
155-
print "CrawlHistory Totals", row["command"], row["count(*)"]
169+
print "crawl_history Totals", row["command"], row["count(*)"]
156170

157171
for row in self.db_conn.execute(
158172
"""SELECT command, count(*)
159-
FROM CrawlHistory
173+
FROM crawl_history
160174
WHERE bool_success = 0
161175
GROUP BY command;""").fetchall():
162176
fails[row["command"]] = row["count(*)"]
163-
print "CrawlHistory Fails", row["command"], row["count(*)"]
177+
print "crawl_history Fails", row["command"], row["count(*)"]
164178

165179
for row in self.db_conn.execute(
166180
"""SELECT command, count(*)
167-
FROM CrawlHistory
181+
FROM crawl_history
168182
WHERE bool_success = -1
169183
GROUP BY command;""").fetchall():
170184
timeouts[row["command"]] = row["count(*)"]
171-
print "CrawlHistory Timeouts", row["command"], row["count(*)"]
185+
print "crawl_history Timeouts", row["command"], row["count(*)"]
172186

173187
for command in command_counts.keys():
174188
self.command_fail_rate[command] = (fails.get(command, 0) /

batch-process.sh

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,41 @@
11
#!/bin/bash
2-
set -e
2+
#set -e
33

44
CENSUS_LZ4_DATA_PATH="/mnt/10tb4/census_data_lz4"
55

6+
CENSUS_NORMALIZED_LZ4_DATA_PATH="/mnt/10tb4/census_data_lz4/normalized/"
7+
68
# We'll extract, process and delete each compressed crawl data
7-
EXTRACTION_DIR="/mnt/ssd/census_tmp"
9+
# EXTRACTION_DIR="/mnt/ssd/census_tmp"
10+
EXTRACTION_DIR="/tmp/census_tmp"
811

912
function decompress_and_process(){
1013
ARCHIVE_BASE_NAME=$(basename "$1")
1114
CRAWL_NAME=${ARCHIVE_BASE_NAME/.tar.lz4/}
1215
CRAWL_DATA_PATH=$EXTRACTION_DIR/$CRAWL_NAME
1316
echo "Will extract $1 to $CRAWL_DATA_PATH"
14-
time lz4 -dc --no-sparse $1 | tar xf - -C $EXTRACTION_DIR
15-
time python process_crawl_data.py $CRAWL_DATA_PATH
16-
# ls -l $EXTRACTION_DIR/*201*/201*.sqlite
17-
# echo "Will vacuum the database"
18-
# time sqlite3 $EXTRACTION_DIR/*201*/*201*.sqlite 'VACUUM;'
19-
# ls -l $EXTRACTION_DIR/*201*/*201*.sqlite
20-
echo "Will remove $EXTRACTION_DIR/201*"
17+
time lz4 -qdc --no-sparse $1 | tar xf - -C $EXTRACTION_DIR
18+
python process_crawl_data.py $CRAWL_DATA_PATH
19+
echo "Size before vacuuming"
20+
ls -hl $EXTRACTION_DIR/*201*/201*.sqlite
21+
time sqlite3 $EXTRACTION_DIR/*201*/*201*.sqlite 'VACUUM;'
22+
echo "Size after vacuuming"
23+
ls -hl $EXTRACTION_DIR/*201*/*201*.sqlite
24+
mkdir -p $CENSUS_NORMALIZED_LZ4_DATA_PATH/$2
25+
26+
OUT_NORMALIZED_ARCHIVE=$EXTRACTION_DIR/$ARCHIVE_BASE_NAME
27+
pushd .
28+
cd $EXTRACTION_DIR
29+
tar c *201* | lz4 -zq - $OUT_NORMALIZED_ARCHIVE
30+
popd
31+
scp $OUT_NORMALIZED_ARCHIVE odin://mnt/10tb2/census-release-normalized/$2/
32+
rm $OUT_NORMALIZED_ARCHIVE
33+
echo "Will remove $EXTRACTION_DIR/*201*"
2134
rm -rf $EXTRACTION_DIR/*201*
35+
# !!! retain the original archive
36+
# rm $1
2237
}
2338

24-
for crawl_archive_lz4 in $CENSUS_LZ4_DATA_PATH/*.tar.lz4
25-
do decompress_and_process $crawl_archive_lz4
39+
for crawl_archive_lz4 in $CENSUS_LZ4_DATA_PATH/$1/*.tar.lz4
40+
do decompress_and_process $crawl_archive_lz4 $1
2641
done;

db_schema.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# TODO:task and crawl have different, non-overlapping columns across versions.
2-
# xpath, site_visits, CrawlHistory, http_redirects has one version only
2+
# xpath, site_visits, crawl_history, http_redirects has one version only
33
# flash cookies, profile_cookies has page_url/visit_id difference
44
# content_policy, pages: no table
55

@@ -134,7 +134,7 @@
134134
JAVASCRIPT_TABLE = "javascript"
135135
JAVASCRIPT_COOKIES_TABLE = "javascript_cookies"
136136
SITE_VISITS_TABLE = "site_visits"
137-
CRAWL_HISTORY_TABLE = "CrawlHistory"
137+
CRAWL_HISTORY_TABLE = "crawl_history"
138138
CRAWL_TABLE = "crawl"
139139
TASK_TABLE = "task"
140140
HTTP_REQUESTS_PROXY_TABLE = "http_requests_proxy"
@@ -150,3 +150,19 @@
150150
FLASH_COOKIES_TABLE: DB_SCHEMA_FLASH_COOKIES,
151151
PROFILE_COOKIES_TABLE: DB_SCHEMA_PROFILE_COOKIES,
152152
}
153+
154+
OPENWPM_TABLES = [
155+
HTTP_REQUESTS_TABLE,
156+
HTTP_RESPONSES_TABLE,
157+
JAVASCRIPT_TABLE,
158+
JAVASCRIPT_COOKIES_TABLE,
159+
SITE_VISITS_TABLE,
160+
CRAWL_HISTORY_TABLE,
161+
CRAWL_TABLE,
162+
TASK_TABLE,
163+
HTTP_REQUESTS_PROXY_TABLE,
164+
HTTP_RESPONSES_PROXY_TABLE,
165+
PROFILE_COOKIES_TABLE,
166+
FLASH_COOKIES_TABLE,
167+
LOCALSTORAGE_TABLE
168+
]

normalize_db.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@
1111
JAVASCRIPT_TABLE, JAVASCRIPT_COOKIES_TABLE]
1212

1313

14+
def rename_crawl_history_table(con):
15+
try:
16+
con.execute("ALTER TABLE CrawlHistory RENAME TO crawl_history;")
17+
except sqlite3.OperationalError:
18+
pass
19+
20+
1421
def add_visit_id_col_to_tables(con):
1522
for table_name in TABLES_WITH_TOP_URL:
1623
try:
@@ -38,8 +45,9 @@ def add_site_visits_table(con):
3845
# See http://alweeam.com.sa in 2016-01_spider_4 for an example
3946
# The following query causes
4047
# query = "select DISTINCT top_url, MAX(crawl_id) from http_requests"
41-
query = "SELECT top_url, MAX(crawl_id) FROM http_requests GROUP BY top_url"
42-
for visit_id, (top_url, crawl_id) in enumerate(cur.execute(query)):
48+
query = """SELECT top_url, MAX(crawl_id), MIN(id) as min_id FROM
49+
http_requests GROUP BY top_url ORDER by min_id ASC"""
50+
for visit_id, (top_url, crawl_id, _) in enumerate(cur.execute(query), 1):
4351
if not top_url:
4452
print "Warning: Empty top-url", top_url, crawl_id
4553
site_visits.append((visit_id, crawl_id, top_url))
@@ -83,8 +91,8 @@ def add_missing_columns(con, table_name, db_schema_str, site_url_visit_id_map):
8391
new_columns = get_column_names_from_create_query(
8492
TABLE_SCHEMAS[table_name])
8593
if new_columns == existing_columns:
86-
print "No missing columns to add to", table_name
87-
return
94+
# print "No missing columns to add to", table_name
95+
return False
8896
print "Will add missing columns to %s: %s" % (table_name, set(
8997
new_columns).difference(set(existing_columns)))
9098

@@ -115,7 +123,7 @@ def add_missing_columns(con, table_name, db_schema_str, site_url_visit_id_map):
115123
cols_to_insert = common_columns + ["visit_id", ]
116124
stream_qry = "SELECT %s FROM %s " % (",".join(cols_to_select),
117125
tmp_table_name)
118-
print "Will iterate over", stream_qry
126+
# print "Will iterate over", stream_qry
119127
insert_qry = "INSERT INTO %s (%s) VALUES (%s)" % (
120128
table_name, ",".join(cols_to_insert),
121129
",".join("?" * len(cols_to_insert)))
@@ -133,7 +141,7 @@ def add_missing_columns(con, table_name, db_schema_str, site_url_visit_id_map):
133141
# print "Will execute %s" % qry
134142
# con.execute(qry, row)
135143
processed += 1
136-
if processed % 10000 == 0:
144+
if processed % 100000 == 0:
137145
con.executemany(insert_qry, data_to_insert)
138146
del data_to_insert[:]
139147
print_progress(t0, processed, num_rows)
@@ -142,7 +150,7 @@ def add_missing_columns(con, table_name, db_schema_str, site_url_visit_id_map):
142150
# read from the temp table and write into the new table
143151
stream_qry = "SELECT %s FROM %s " % (",".join(common_columns),
144152
tmp_table_name)
145-
print "Will iterate over", stream_qry
153+
# print "Will iterate over", stream_qry
146154
insert_qry = "INSERT INTO %s (%s) VALUES (%s)" % (
147155
table_name, ",".join(common_columns),
148156
",".join("?" * len(common_columns)))
@@ -151,7 +159,7 @@ def add_missing_columns(con, table_name, db_schema_str, site_url_visit_id_map):
151159
# print "Will execute %s" % qry
152160
# con.execute(insert_qry, row)
153161
processed += 1
154-
if processed % 10000 == 0:
162+
if processed % 100000 == 0:
155163
con.executemany(insert_qry, data_to_insert)
156164
del data_to_insert[:]
157165
print_progress(t0, processed, num_rows)
@@ -160,10 +168,8 @@ def add_missing_columns(con, table_name, db_schema_str, site_url_visit_id_map):
160168
print "Will drop the temp table",
161169
con.execute("DROP TABLE %s" % tmp_table_name)
162170
print "(took", time() - t0, "s)"
163-
print "Will commit changes",
164-
t0 = time()
165171
con.commit()
166-
print "(took", time() - t0, "s)"
172+
return True
167173

168174

169175
def get_column_names_from_create_query(create_table_query):
@@ -192,11 +198,11 @@ def add_missing_columns_to_all_tables(con, db_schema_str):
192198
# TODO: search in table names instead of the db schema
193199
if table_name in db_schema_str:
194200
t0 = time()
195-
add_missing_columns(con, table_name, db_schema_str,
196-
site_url_visit_id_map)
197-
duration = time() - t0
198-
print "Took %s s to add missing columns to %s" % (duration,
199-
table_name)
201+
if add_missing_columns(con, table_name, db_schema_str,
202+
site_url_visit_id_map):
203+
duration = time() - t0
204+
print "Took %s s to add missing columns to %s" % (duration,
205+
table_name)
200206

201207

202208
if __name__ == '__main__':

process_crawl_data.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import sys
22
import sqlite3
33
import os
4+
from time import time
45
from util import CRAWL_DB_EXT, get_table_and_column_names, load_alexa_ranks,\
56
copy_if_not_exists
67
from os.path import join, isfile, basename, isdir, dirname, sep
78
import glob
89
from normalize_db import add_site_visits_table, add_alexa_rank_to_site_visits,\
9-
add_missing_columns_to_all_tables
10-
from db_schema import SITE_VISITS_TABLE
10+
add_missing_columns_to_all_tables, rename_crawl_history_table
11+
from db_schema import SITE_VISITS_TABLE, CRAWL_HISTORY_TABLE
1112
from analyze_crawl import CrawlDBAnalysis
1213

1314
ROOT_OUT_DIR = "/mnt/10tb4/census-release"
@@ -24,11 +25,9 @@
2425
CRONTAB_LOG_FILENAME = "crontab.log"
2526
ALEXA_TOP1M_CSV_FILENAME = "top-1m.csv"
2627
JAVASCRIPT_SRC_DIRNAME = "content.ldb"
27-
DEFAULT_SQLITE_CACHE_SIZE_GB = 3
28+
DEFAULT_SQLITE_CACHE_SIZE_GB = 16
2829

29-
# Disable adding new columns for now
30-
# TODO: enable for the final runs
31-
ADD_MISSING_COLUMNS = False
30+
ADD_MISSING_COLUMNS = True
3231

3332

3433
class CrawlData(object):
@@ -65,8 +64,10 @@ def optimize_db(self, size_in_gb=DEFAULT_SQLITE_CACHE_SIZE_GB):
6564

6665
def vacuum_db(self):
6766
"""."""
68-
print "Will vacuum the DB"
67+
print "Will vacuum the DB",
68+
t0 = time()
6969
self.db_conn.execute("VACUUM;")
70+
print "finished in", float(time() - t0) / 60, "mins"
7071

7172
def set_crawl_dir(self, crawl_dir):
7273
if isdir(crawl_dir):
@@ -117,6 +118,9 @@ def normalize_db(self):
117118
if SITE_VISITS_TABLE not in db_schema_str:
118119
print "Adding site_visits table"
119120
add_site_visits_table(self.db_conn)
121+
if CRAWL_HISTORY_TABLE not in db_schema_str:
122+
print "Renaming CrawlHistory table to crawl_history"
123+
rename_crawl_history_table(self.db_conn)
120124
# Add site ranks to site_visits table
121125
if "site_rank" not in db_schema_str:
122126
if self.alexa_csv_path:
@@ -127,6 +131,7 @@ def normalize_db(self):
127131
print "Missing Alexa ranks CSV, can't add ranks to site_visits"
128132
if ADD_MISSING_COLUMNS:
129133
add_missing_columns_to_all_tables(self.db_conn, db_schema_str)
134+
print "Will commit the changes"
130135
self.db_conn.commit()
131136

132137
def dump_db_schema(self):
@@ -158,8 +163,12 @@ def backup_crawl_files(self):
158163

159164

160165
if __name__ == '__main__':
166+
t0 = time()
161167
crawl_data = CrawlData(sys.argv[1])
162168
crawl_data.pre_process()
169+
t1 = time()
170+
print "Preprocess finished in", float(t1 - t0) / 60, "mins"
163171
analysis = CrawlDBAnalysis(crawl_data.crawl_db_path, ANALYSIS_OUT_DIR,
164172
crawl_data.crawl_name)
165173
analysis.start_analysis()
174+
print "Analysis finished in", float(time() - t1) / 60, "mins"

0 commit comments

Comments
 (0)