forked from practicalparticipation/iati-datastore
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcrawler.py
474 lines (396 loc) · 15.7 KB
/
crawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
import datetime
import hashlib
import logging
import traceback
import iatikit
import sqlalchemy as sa
from dateutil.parser import parse as date_parser
from flask import Blueprint
import click
from iatilib import db, parse, rq
from iatilib.model import Dataset, Resource, Activity, Log, DeletedActivity
from iatilib.loghandlers import DatasetMessage as _
log = logging.getLogger("crawler")
manager = Blueprint('crawler', __name__)
manager.cli.short_help = "Crawl IATI registry"
def fetch_dataset_list():
'''
Fetches datasets from iatikit and stores them in the DB. Used in update() to update the Flask job queue. Uses CKAN metadata to determine
if an activity is active or deleted.
:return:
'''
existing_datasets = Dataset.query.all()
existing_ds_names = set(ds.name for ds in existing_datasets)
package_list = [d.name for d in iatikit.data().datasets]
incoming_ds_names = set(package_list)
new_datasets = [Dataset(name=n) for n
in incoming_ds_names - existing_ds_names]
all_datasets = existing_datasets + new_datasets
last_seen = iatikit.data().last_updated
for dataset in all_datasets:
dataset.last_seen = last_seen
db.session.add_all(all_datasets)
db.session.commit()
deleted_ds_names = existing_ds_names - incoming_ds_names
if deleted_ds_names:
delete_datasets(deleted_ds_names)
all_datasets = Dataset.query
return all_datasets
def delete_datasets(datasets):
deleted = sum([
delete_dataset(dataset)
for dataset in datasets])
log.info("Deleted {0} datasets".format(deleted))
return deleted
def delete_dataset(dataset):
deleted_dataset = db.session.query(Dataset). \
filter(Dataset.name == dataset)
activities_to_delete = db.session.query(Activity). \
filter(Activity.resource_url == Resource.url). \
filter(Resource.dataset_id == dataset)
now = datetime.datetime.now()
for a in activities_to_delete:
db.session.merge(DeletedActivity(
iati_identifier=a.iati_identifier,
deletion_date=now
))
db.session.commit()
return deleted_dataset.delete(synchronize_session='fetch')
def fetch_dataset_metadata(dataset):
d = iatikit.data().datasets.get(dataset.name)
dataset.publisher = d.metadata['organization']['name']
dataset.last_modified = date_parser(
d.metadata.get(
'metadata_modified',
datetime.datetime.now().date().isoformat()))
new_urls = [resource['url'] for resource
in d.metadata.get('resources', [])
if resource['url'] not in dataset.resource_urls]
dataset.resource_urls.extend(new_urls)
urls = [resource['url'] for resource
in d.metadata.get('resources', [])]
for deleted in set(dataset.resource_urls) - set(urls):
dataset.resource_urls.remove(deleted)
dataset.license = d.metadata.get('license_id')
dataset.is_open = d.metadata.get('isopen', False)
db.session.add(dataset)
return dataset
def fetch_resource(dataset, ignore_hashes):
'''
Gets the resource and sets the times of last successful update based on the status code.
If `ignore_hashes` is set to True, `last_parsed` will be set to None and an
update will be triggered.
:param resource:
:return:
'''
d = iatikit.data().datasets.get(dataset.name)
last_updated = iatikit.data().last_updated
resource = dataset.resources[0]
resource.last_fetch = last_updated
try:
content = d.raw_xml
resource.last_status_code = 200
resource.last_succ = last_updated
if (not resource.document) or \
(hash(resource.document) != hash(content)) or \
ignore_hashes:
resource.document = content
resource.last_parsed = None
resource.last_parse_error = None
except IOError:
# TODO: this isn't true
resource.last_status_code = 404
db.session.add(resource)
return resource
def check_for_duplicates(activities):
if activities:
dup_activity = Activity.query.filter(
Activity.iati_identifier.in_(
a.iati_identifier for a in activities
)
)
with db.session.no_autoflush:
for db_activity in dup_activity:
res_activity = next(
a for a in activities
if a.iati_identifier == db_activity.iati_identifier
)
activities.remove(res_activity)
db.session.expunge(res_activity)
return activities
def hash(string):
m = hashlib.md5()
m.update(string)
return m.digest()
def parse_activity(new_identifiers, old_xml, resource):
for activity in parse.document_from_bytes(resource.document, resource):
activity.resource = resource
if activity.iati_identifier not in new_identifiers:
new_identifiers.add(activity.iati_identifier)
try:
if hash(activity.raw_xml.encode('utf-8')) == old_xml[activity.iati_identifier][1]:
activity.last_change_datetime = old_xml[activity.iati_identifier][0]
else:
activity.last_change_datetime = datetime.datetime.now()
except KeyError:
activity.last_change_datetime = datetime.datetime.now()
db.session.add(activity)
check_for_duplicates([activity])
else:
parse.log.warn(
_("Duplicate identifier {0} in same resource document".format(
activity.iati_identifier),
logger='activity_importer', dataset=resource.dataset_id, resource=resource.url),
exc_info=''
)
db.session.flush()
db.session.commit()
def parse_resource(resource):
db.session.add(resource)
current = Activity.query.filter_by(resource_url=resource.url)
current_identifiers = set([i.iati_identifier for i in current.all()])
# obtains the iati-identifier, last-updated datetime, and a hash of the existing xml associated with
# every activity associated with the current url.
old_xml = dict([(i[0], (i[1], hash(i[2].encode('utf-8')))) for i in db.session.query(
Activity.iati_identifier, Activity.last_change_datetime,
Activity.raw_xml).filter_by(resource_url=resource.url)])
db.session.query(Activity).filter_by(resource_url=resource.url).delete()
new_identifiers = set()
parse_activity(new_identifiers, old_xml, resource)
resource.version = parse.document_metadata(resource.document)
# add any identifiers that are no longer present to deleted_activity table
diff = current_identifiers - new_identifiers
now = datetime.datetime.utcnow()
deleted = [
DeletedActivity(iati_identifier=deleted_activity, deletion_date=now)
for deleted_activity in diff]
if deleted:
db.session.add_all(deleted)
# remove any new identifiers from the deleted_activity table
if new_identifiers:
db.session.query(DeletedActivity) \
.filter(DeletedActivity.iati_identifier.in_(new_identifiers)) \
.delete(synchronize_session="fetch")
log.info(
"Parsed %d activities from %s",
resource.activities.count(),
resource.url)
resource.last_parsed = now
return resource # , new_identifiers
def update_activities(dataset_name):
'''
Parses and stores the raw XML associated with a resource [see parse_resource()], or logs the invalid resource
:param resource_url:
:return:
'''
# clear up previous job queue log errors
db.session.query(Log).filter(sa.and_(
Log.logger == 'job iatilib.crawler.update_activities',
Log.resource == dataset_name,
)).delete(synchronize_session=False)
db.session.commit()
dataset = Dataset.query.get(dataset_name)
resource = dataset.resources[0]
try:
db.session.query(Log).filter(sa.and_(
Log.logger.in_(
['activity_importer', 'failed_activity', 'xml_parser']),
Log.resource == dataset_name,
)).delete(synchronize_session=False)
parse_resource(resource)
db.session.commit()
except parse.ParserError as exc:
db.session.rollback()
resource.last_parse_error = str(exc)
db.session.add(resource)
db.session.add(Log(
dataset=resource.dataset_id,
resource=resource.url,
logger="xml_parser",
msg="Failed to parse XML file {0} error was".format(dataset_name, exc),
level="error",
trace=traceback.format_exc(),
created_at=datetime.datetime.now()
))
db.session.commit()
def update_dataset(dataset_name, ignore_hashes):
'''
Takes the dataset name and determines whether or not an update is needed based on whether or not the last
successful update detail exits, and whether or not it last updated since the contained data was updated.
If ignore_hashes is set to true, an update will be triggered, regardless of whether there appears
to be any change in the dataset hash compared with that stored in the database.
:param dataset_name:
:param ignore_hashes:
:return:
'''
# clear up previous job queue log errors
db.session.query(Log).filter(sa.and_(
Log.logger == 'job iatilib.crawler.update_dataset',
Log.dataset == dataset_name,
)).delete(synchronize_session=False)
db.session.commit()
queue = rq.get_queue()
dataset = Dataset.query.get(dataset_name)
fetch_dataset_metadata(dataset)
try:
db.session.commit()
except sa.exc.IntegrityError as exc:
db.session.rollback()
# the resource can't be added, so we should
# give up.
db.session.add(Log(
dataset=dataset_name,
resource=None,
logger="update_dataset",
msg="Failed to update dataset {0}, error was".format(dataset_name, exc),
level="error",
trace=traceback.format_exc(),
created_at=datetime.datetime.now()
))
db.session.commit()
return
resource = fetch_resource(dataset, ignore_hashes)
db.session.commit()
if resource.last_status_code == 200 and not resource.last_parsed:
queue.enqueue(
update_activities, args=(dataset_name,),
result_ttl=0, job_timeout=100000,
# See https://github.com/codeforIATI/iati-datastore/issues/285 for discussion of at_front=True
at_front=True
)
def status_line(msg, filt, tot):
total_count = tot.count()
filtered_count = filt.count()
try:
ratio = 1.0 * filtered_count / total_count
except ZeroDivisionError:
ratio = 0.0
return "{filt_c:4d}/{tot_c:4d} ({pct:6.2%}) {msg}".format(
filt_c=filtered_count,
tot_c=total_count,
pct=ratio,
msg=msg
)
@manager.cli.command('status')
def status_cmd():
"""Show status of current jobs"""
print("%d jobs on queue" % rq.get_queue().count)
print(status_line(
"datasets have no metadata",
Dataset.query.filter_by(last_modified=None),
Dataset.query,
))
print(status_line(
"datasets not seen in the last day",
Dataset.query.filter(Dataset.last_seen <
(datetime.datetime.utcnow() - datetime.timedelta(days=1))),
Dataset.query,
))
print(status_line(
"resources have had no attempt to fetch",
Resource.query.outerjoin(Dataset).filter(
Resource.last_fetch == None),
Resource.query,
))
print(status_line(
"resources not successfully fetched",
Resource.query.outerjoin(Dataset).filter(
Resource.last_succ == None),
Resource.query,
))
print(status_line(
"resources not fetched since modification",
Resource.query.outerjoin(Dataset).filter(
sa.or_(
Resource.last_succ == None,
Resource.last_succ < Dataset.last_modified)),
Resource.query,
))
print(status_line(
"resources not parsed since mod",
Resource.query.outerjoin(Dataset).filter(
sa.or_(
Resource.last_succ == None,
Resource.last_parsed < Dataset.last_modified)),
Resource.query,
))
print(status_line(
"resources have no activites",
db.session.query(Resource.url).outerjoin(Activity)
.group_by(Resource.url)
.having(sa.func.count(Activity.iati_identifier) == 0),
Resource.query))
print("")
total_activities = Activity.query.count()
# out of date activitiy was created < resource last_parsed
total_activities_fetched = Activity.query.join(Resource).filter(
Activity.created < Resource.last_parsed).count()
try:
ratio = 1.0 * total_activities_fetched / total_activities
except ZeroDivisionError:
ratio = 0.0
print("{nofetched_c}/{res_c} ({pct:6.2%}) activities out of date".format(
nofetched_c=total_activities_fetched,
res_c=total_activities,
pct=ratio
))
@manager.cli.command('download')
def download_cmd():
"""
Download all IATI data from IATI Data Dump.
"""
iatikit.download.data()
def download_and_update(ignore_hashes):
iatikit.download.data()
update_registry(ignore_hashes)
@manager.cli.command('fetch-dataset-list')
def fetch_dataset_list_cmd():
"""
Fetches dataset metadata from existing IATI Data Dump cache.
"""
fetch_dataset_list()
@click.option('--ignore-hashes', is_flag=True,
help="Ignore hashes in the database, which determine whether \
activities should be updated or not, and update all data. \
This will lead to a full refresh of data.")
@manager.cli.command('download-and-update')
def download_and_update_cmd(ignore_hashes):
"""
Enqueue a download of all IATI data from
IATI Data Dump, and then start an update.
"""
queue = rq.get_queue()
print("Enqueuing a download from IATI Data Dump")
queue.enqueue(
download_and_update,
args=(ignore_hashes,),
result_ttl=0,
job_timeout=100000)
def update_registry(ignore_hashes):
"""
Get all dataset metadata and update dataset data.
"""
queue = rq.get_queue()
datasets = fetch_dataset_list()
print("Enqueuing %d datasets for update" % datasets.count())
for dataset in datasets:
queue.enqueue(update_dataset, args=(dataset.name, ignore_hashes), result_ttl=0)
@click.option('--dataset', 'dataset', type=str,
help="update a single dataset")
@click.option('--ignore-hashes', is_flag=True,
help="Ignore hashes in the database, which determine whether \
activities should be updated or not, and update all data. \
This will lead to a full refresh of data.")
@manager.cli.command('update')
def update_cmd(ignore_hashes, dataset=None):
"""
Step through downloaded datasets, adding them to the dataset table, and then adding an update command to
the Flask job queue. See update_registry, then update_dataset for next actions.
"""
queue = rq.get_queue()
if dataset is not None:
print("Enqueuing {0} for update".format(dataset))
queue.enqueue(update_dataset, args=(dataset, ignore_hashes), result_ttl=0)
else:
print("Enqueuing a full registry update")
queue.enqueue(update_registry, args=(ignore_hashes,), result_ttl=0)