9
9
import sentry_sdk
10
10
import urllib .parse
11
11
import urllib3
12
+ import argparse
13
+ import redis
12
14
13
15
from codecs import open
14
16
from dotenv import load_dotenv
15
17
from sqlalchemy import create_engine , exc , MetaData , Table , orm , func , insert , update
16
18
from SPARQLWrapper import SPARQLWrapper , JSON
17
19
from requests .adapters import HTTPAdapter
18
20
from urllib3 .util .retry import Retry
21
+ from datetime import datetime , timedelta
19
22
20
23
load_dotenv (dotenv_path = '.env' )
21
24
sentry_sdk .init (dsn = os .getenv ('SENTRY_DSN_SYNCHRO' ))
@@ -110,6 +113,7 @@ class Query(object):
110
113
373074 , # suffragan diocese
111
114
]
112
115
dateformat = '%Y-%m-%d %H:%M:%S'
116
+ verbosity_level = 0
113
117
114
118
def __init__ (self ):
115
119
self .cache_places = {}
@@ -149,7 +153,7 @@ def split_into_batches(lst, batch_size):
149
153
150
154
def fetch (self , file_name , query ):
151
155
if os .path .isfile (file_name ):
152
- if os .path .getmtime (file_name ) > time .time () - 12 * 3600 : # cache JSON for 12 hours
156
+ if os .path .getmtime (file_name ) > time .time () - 1 * 1800 : # cache JSON for 30 mins
153
157
with open (file_name , 'r' , encoding = 'utf-8' ) as content_file :
154
158
print ('Loading from file' , file_name ,'please wait...' )
155
159
return json .loads (content_file .read ())
@@ -195,7 +199,7 @@ def extractDiocesesFromSparqlQuery(self, sparqlData):
195
199
'name' : Query .ucfirst (label_fr ),
196
200
'contactCountryCode' : 'fr' ,
197
201
'website' : website ,
198
- 'wikidataUpdatedAt' : str (datetime .datetime . strptime (modified , Query .dateformat )),
202
+ 'wikidataUpdatedAt' : str (datetime .strptime (modified , Query .dateformat )),
199
203
}
200
204
return dioceses
201
205
@@ -284,7 +288,7 @@ def extractChurchesFromSparqlQuery(self, sparqlData):
284
288
'name' : Query .ucfirst (label_fr ),
285
289
'latitude' : float (latitude ),
286
290
'longitude' : float (longitude ),
287
- 'wikidataUpdatedAt' : str (datetime .datetime . strptime (modified , Query .dateformat )),
291
+ 'wikidataUpdatedAt' : str (datetime .strptime (modified , Query .dateformat )),
288
292
}
289
293
return churches
290
294
@@ -295,8 +299,10 @@ def update_dioceses(self, sparqlData, client):
295
299
fields = client .populate_fields (wikidataIdDioceses [wikidataId ], wikidataId )
296
300
wikidataEntities ['wikidataEntities' ].append (fields )
297
301
if len (wikidataEntities ['wikidataEntities' ]) > 0 :
302
+ self .print_logs (wikidataEntities ['wikidataEntities' ], 2 )
298
303
response = client .upsert_wikidata_entities ('/communities/upsert' , wikidataEntities )
299
- print ('DIOCESES : ' , response )
304
+ self .print_logs (response , 1 )
305
+ return response
300
306
301
307
def update_parishes (self , sparqlData , client ):
302
308
wikidataEntities = {'wikidataEntities' : []}
@@ -305,8 +311,10 @@ def update_parishes(self, sparqlData, client):
305
311
fields = client .populate_fields (wikidataIdParishes [wikidataId ], wikidataId )
306
312
wikidataEntities ['wikidataEntities' ].append (fields )
307
313
if len (wikidataEntities ['wikidataEntities' ]) > 0 :
314
+ self .print_logs (wikidataEntities ['wikidataEntities' ], 2 )
308
315
response = client .upsert_wikidata_entities ('/communities/upsert' , wikidataEntities )
309
- print ("PARISHES : " , response )
316
+ self .print_logs (response , 1 )
317
+ return response
310
318
311
319
def update_churches (self , sparqlData , client ):
312
320
wikidataEntities = {'wikidataEntities' : []}
@@ -315,8 +323,14 @@ def update_churches(self, sparqlData, client):
315
323
fields = client .populate_fields (wikidataIdChurches [wikidataId ], wikidataId )
316
324
wikidataEntities ['wikidataEntities' ].append (fields )
317
325
if len (wikidataEntities ['wikidataEntities' ]) > 0 :
326
+ self .print_logs (wikidataEntities ['wikidataEntities' ], 2 )
318
327
response = client .upsert_wikidata_entities ('/places/upsert' , wikidataEntities )
319
- print ("CHURCHES : " , response )
328
+ self .print_logs (response , 1 )
329
+ return response
330
+
331
+ def print_logs (self , data , required_level ):
332
+ if self .verbosity_level >= required_level :
333
+ print (json .dumps (data , indent = 4 , ensure_ascii = False ))
320
334
321
335
class UuidDoesNotExistException (Exception ):
322
336
pass
@@ -330,7 +344,7 @@ class OpenChurchClient(object):
330
344
session = requests .Session ()
331
345
# Configure retries and timeouts
332
346
retry_strategy = Retry (
333
- total = 3 ,
347
+ total = 1 ,
334
348
backoff_factor = 1 ,
335
349
status_forcelist = [429 , 500 , 502 , 503 , 504 ]
336
350
)
@@ -347,15 +361,15 @@ class OpenChurchClient(object):
347
361
session .request_timeout = (3.05 , 27 )
348
362
349
363
def upsert_wikidata_entities (self , path , body ):
350
- response = self . session . put ( self . hostname + path , json = body , headers = self . headers , verify = False )
351
- if response . status_code == 200 :
352
- data = response .json ()
353
- return data
354
- elif response . status_code == 404 and True :
355
- print ( response . status_code , response . text , 'for GET' , path )
356
- raise UuidDoesNotExistException
357
- else :
358
- print ( response . status_code , response . text , 'for PUT' , path , body )
364
+ try :
365
+ response = self . session . put ( self . hostname + path , json = body , headers = self . headers , verify = False )
366
+ if response .status_code == 200 :
367
+ data = response . json ()
368
+ return data
369
+ else :
370
+ print ( response . status_code , 'for PUT' , path )
371
+ return None
372
+ except requests . exceptions . RequestException as e :
359
373
return None
360
374
361
375
def populate_fields (self , values , wikidata_id ):
@@ -375,22 +389,64 @@ def populate_fields(self, values, wikidata_id):
375
389
def percentage (num , total ):
376
390
return '%s = %s%%' % (num , (round (100 * num / total , 2 )))
377
391
378
- if __name__ == '__main__' :
392
+ def get_redis_key (type , origin , to ):
393
+ return '%s_%s-%s' % (type , origin , to )
394
+
395
+ def process_entity (type , batch_size , verbosity_level ):
396
+ redis_url = os .getenv ('REDIS_URL' )
397
+ redis_client = redis .from_url (redis_url )
379
398
q = Query ()
399
+ q .verbosity_level = verbosity_level
380
400
client = OpenChurchClient ()
381
- batch_size = 25
382
-
383
- # dioceses = q.fetch('wikidata_dioceses.json', dioceses_query)
384
- # batches = Query.split_into_batches(dioceses, batch_size)
385
- # for batch in batches:
386
- # q.update_dioceses(batch, client)
387
401
388
- # parishes = q.fetch('wikidata_parishes.json', parishes_query)
389
- # batches = Query.split_into_batches(parishes, batch_size)
390
- # for batch in batches:
391
- # q.update_parishes(batch, client)
392
-
393
- churches = q .fetch ('wikidata_churches.json' , churches_query )
394
- batches = Query .split_into_batches (churches , batch_size )
402
+ print ("starting synchro for" , type )
403
+ if type == "diocese" :
404
+ data = q .fetch ('wikidata_dioceses.json' , dioceses_query )
405
+ method = "update_dioceses"
406
+ elif type == "parish" :
407
+ data = q .fetch ('wikidata_parishes.json' , parishes_query )
408
+ method = "update_parishes"
409
+ elif type == "church" :
410
+ data = q .fetch ('wikidata_churches.json' , churches_query )
411
+ method = "update_churches"
412
+ else :
413
+ raise ("Type d'entité non reconnu" )
414
+
415
+ batches = Query .split_into_batches (data , batch_size )
416
+ iteration = 1
395
417
for batch in batches :
396
- q .update_churches (batch , client )
418
+ can_process = True
419
+ key = get_redis_key (type , iteration - 1 , len (batch ))
420
+ value = redis_client .hgetall (key )
421
+ if value :
422
+ # A key exist. We chek if we can process it
423
+ decoded_data = {key .decode ('utf-8' ): value .decode ('utf-8' ) for key , value in value .items ()}
424
+ if decoded_data .get ('status' ) in {'success' , 'error' }:
425
+ time_diff = datetime .now () - datetime .strptime (decoded_data .get ('updatedAt' ), "%Y-%m-%d %H:%M:%S.%f" )
426
+ if time_diff <= timedelta (minutes = 30 ):
427
+ can_process = False # We updated the batch less than 30 minutes ago. We skip it
428
+
429
+ if can_process :
430
+ redis_client .hset (key , "status" , "processing" )
431
+ redis_client .hset (key , "updatedAt" , str (datetime .now ()))
432
+ print ("Processing batch %s/%s" % (iteration , len (batches ) + 1 ))
433
+ res = getattr (q , method )(batch , client )
434
+ if res :
435
+ success_count = sum (1 for value in res .values () if value in {'Updated' , 'Inserted' })
436
+ redis_client .hset (key , "successCount" , success_count )
437
+ redis_client .hset (key , "failureCount" , len (res ) - success_count )
438
+ redis_client .hset (key , "status" , "success" )
439
+ print (success_count , len (res ) - success_count )
440
+ else :
441
+ redis_client .hset (key , "status" , "error" )
442
+ else :
443
+ print ("Ignore batch %s/%s" % (iteration , len (batches ) + 1 ))
444
+ iteration += 1
445
+ print ("ended syncrho for" , type )
446
+
447
+ if __name__ == '__main__' :
448
+ parser = argparse .ArgumentParser ()
449
+ parser .add_argument ("--entity-only" , type = str , required = True , choices = ["parish" , "diocese" , "church" ], help = "Spécifiez l'entité à traiter : 'diocese', 'parish' ou 'church'" )
450
+ parser .add_argument ("-v" , "--verbose" , action = "count" , default = 0 , help = "Augmente le niveau de verbosité (utilisez -vvv pour plus de détails)." )
451
+ args = parser .parse_args ()
452
+ process_entity (args .entity_only , 100 , args .verbose )
0 commit comments