@@ -168,7 +168,7 @@ def fetch(self, file_name, query):
168
168
sparql .setReturnFormat (JSON )
169
169
170
170
with open (file_name , 'w' , encoding = 'utf-8' ) as f :
171
- f .write ('[' ) # Début du tableau JSON
171
+ f .write ('[' )
172
172
first_batch = True
173
173
while True :
174
174
try :
@@ -177,24 +177,22 @@ def fetch(self, file_name, query):
177
177
data = sparql .query ().convert ()
178
178
179
179
results = data ['results' ]['bindings' ]
180
- if not results : # Si aucun résultat, arrêter la boucle
180
+ if not results : # if no result, we stop the loop
181
181
break
182
182
183
- # Ajouter les résultats au fichier
183
+ # add data to file
184
184
for result in results :
185
185
if not first_batch :
186
- f .write (',' ) # Ajouter une virgule entre les objets JSON
186
+ f .write (',' )
187
187
json .dump (result , f , ensure_ascii = False )
188
188
first_batch = False
189
-
190
189
offset += batch_size
191
-
192
190
except Exception as e :
193
- print (f"Échec du chargement des données entre { offset } et { offset + batch_size } : { e } " )
194
- break
191
+ print (f"Failed to load data from { offset } to { offset + batch_size } : { e } " )
192
+ offset += batch_size
195
193
f .write (']' ) # Fin du tableau JSON
196
194
197
- print (f'Données écrites dans le fichier { file_name } .' )
195
+ print (f'Data written in { file_name } .' )
198
196
with open (file_name , 'r' , encoding = 'utf-8' ) as content_file :
199
197
return json .loads (content_file .read ())
200
198
@@ -414,67 +412,99 @@ def populate_fields(self, values, wikidata_id):
414
412
'explanation' : 'https://www.wikidata.org/wiki/Q' + format (wikidata_id ),
415
413
})
416
414
return fields
415
+
416
+ class Processor (object ):
417
+ client = OpenChurchClient ()
418
+ q = Query ()
419
+ redis_url = os .getenv ('REDIS_URL' )
420
+ redis_client = redis .from_url (redis_url )
421
+ verbosity_level = 0
422
+ type = 'diocece'
423
+ batch_size = 100
424
+
425
+ def process_batch (self , data , method , run_id ):
426
+ batches = Query .split_into_batches (data , self .batch_size )
427
+ self .redis_client .hset (self .type , "batchCount" , len (batches ))
428
+ iteration = 1
429
+ for batch in batches :
430
+ can_process = True
431
+ self .redis_client .hset (self .type , "currentBatch" , iteration )
432
+ key_batch = get_redis_key (self .type , (iteration - 1 ) * self .batch_size , (iteration ) * self .batch_size )
433
+ value_batch = self .redis_client .hgetall (key_batch )
434
+ if value_batch :
435
+ # A key exist. We chek if we can process it
436
+ decoded_data = {key .decode ('utf-8' ): value_batch .decode ('utf-8' ) for key , value_batch in value_batch .items ()}
437
+ current_run_id = decoded_data .get ('runId' )
438
+ if current_run_id == run_id :
439
+ can_process = False # This have already been processed. We skip it
440
+ if can_process :
441
+ self .redis_client .hset (key_batch , "status" , "processing" )
442
+ self .redis_client .hset (key_batch , "updatedAt" , str (datetime .now ()))
443
+ self .redis_client .hset (key_batch , "runId" , run_id )
444
+ print ("Processing batch %s/%s" % (iteration , len (batches )))
445
+ res = getattr (self .q , method )(batch , self .client )
446
+ if res :
447
+ success_count = sum (1 for value in res .values () if value in {'Updated' , 'Inserted' })
448
+ self .redis_client .hset (key_batch , "successCount" , success_count )
449
+ self .redis_client .hset (key_batch , "failureCount" , len (res ) - success_count )
450
+ self .redis_client .hset (key_batch , "status" , "success" )
451
+ else :
452
+ self .redis_client .hset (key_batch , "status" , "error" )
453
+ else :
454
+ print ("Ignore batch %s/%s" % (iteration , len (batches )))
455
+ iteration += 1
456
+
457
+ def process_entity (self ):
458
+ print ("starting synchro for" , self .type )
459
+ if self .type == "diocese" :
460
+ data = self .q .fetch ('wikidata_dioceses.json' , dioceses_query )
461
+ method = "update_dioceses"
462
+ elif self .type == "parish" :
463
+ data = self .q .fetch ('wikidata_parishes.json' , parishes_query )
464
+ method = "update_parishes"
465
+ elif self .type == "church" :
466
+ data = self .q .fetch ('wikidata_churches.json' , churches_query )
467
+ method = "update_churches"
468
+ else :
469
+ raise ("Unknown entity type %s" % self .type )
470
+
471
+ value_entity = self .redis_client .hgetall (self .type )
472
+ if value_entity :
473
+ decoded_data = {key .decode ('utf-8' ): value_entity .decode ('utf-8' ) for key , value_entity in value_entity .items ()}
474
+ run_id = decoded_data .get ('runId' )
475
+ if decoded_data .get ('status' ) in {'processing' }:
476
+ self .process_batch (data , method , run_id )
477
+ else :
478
+ self .clean_entity (int (run_id ) + 1 )
479
+ self .process_batch (data , method , run_id )
480
+ else :
481
+ self .clean_entity (1 )
482
+ self .process_batch (data , method , 1 )
483
+
484
+ self .redis_client .hset (self .type , "status" , "success" )
485
+ self .redis_client .hset (self .type , "endDate" , str (datetime .now ()))
486
+ print ("ended synchro for" , self .type )
487
+
488
+ def clean_entity (self , run_id ):
489
+ self .redis_client .hset (self .type , "runId" , run_id )
490
+ self .redis_client .hset (self .type , "startDate" , str (datetime .now ()))
491
+ self .redis_client .hset (self .type , "status" , "processing" )
492
+ self .redis_client .hset (self .type , "batchSize" , self .batch_size )
493
+ self .redis_client .hdel (self .type , "endDate" )
417
494
418
495
def percentage (num , total ):
419
496
return '%s = %s%%' % (num , (round (100 * num / total , 2 )))
420
497
421
498
def get_redis_key (type , origin , to ):
422
499
return '%s_%s-%s' % (type , origin , to )
423
500
424
- def process_entity (type , batch_size , verbosity_level ):
425
- redis_url = os .getenv ('REDIS_URL' )
426
- redis_client = redis .from_url (redis_url )
427
- q = Query ()
428
- q .verbosity_level = verbosity_level
429
- client = OpenChurchClient ()
430
-
431
- print ("starting synchro for" , type )
432
- if type == "diocese" :
433
- data = q .fetch ('wikidata_dioceses.json' , dioceses_query )
434
- method = "update_dioceses"
435
- elif type == "parish" :
436
- data = q .fetch ('wikidata_parishes.json' , parishes_query )
437
- method = "update_parishes"
438
- elif type == "church" :
439
- data = q .fetch ('wikidata_churches.json' , churches_query )
440
- method = "update_churches"
441
- else :
442
- raise ("Type d'entité non reconnu" )
443
-
444
- batches = Query .split_into_batches (data , batch_size )
445
- iteration = 1
446
- for batch in batches :
447
- can_process = True
448
- key = get_redis_key (type , iteration - 1 , len (batch ))
449
- value = redis_client .hgetall (key )
450
- if value :
451
- # A key exist. We chek if we can process it
452
- decoded_data = {key .decode ('utf-8' ): value .decode ('utf-8' ) for key , value in value .items ()}
453
- if decoded_data .get ('status' ) in {'success' , 'error' }:
454
- time_diff = datetime .now () - datetime .strptime (decoded_data .get ('updatedAt' ), "%Y-%m-%d %H:%M:%S.%f" )
455
- if time_diff <= timedelta (minutes = 30 ):
456
- can_process = False # We updated the batch less than 30 minutes ago. We skip it
457
-
458
- if can_process :
459
- redis_client .hset (key , "status" , "processing" )
460
- redis_client .hset (key , "updatedAt" , str (datetime .now ()))
461
- print ("Processing batch %s/%s" % (iteration , len (batches ) + 1 ))
462
- res = getattr (q , method )(batch , client )
463
- if res :
464
- success_count = sum (1 for value in res .values () if value in {'Updated' , 'Inserted' })
465
- redis_client .hset (key , "successCount" , success_count )
466
- redis_client .hset (key , "failureCount" , len (res ) - success_count )
467
- redis_client .hset (key , "status" , "success" )
468
- else :
469
- redis_client .hset (key , "status" , "error" )
470
- else :
471
- print ("Ignore batch %s/%s" % (iteration , len (batches ) + 1 ))
472
- iteration += 1
473
- print ("ended synchro for" , type )
474
-
475
501
if __name__ == '__main__' :
476
502
parser = argparse .ArgumentParser ()
477
503
parser .add_argument ("--entity-only" , type = str , required = True , choices = ["parish" , "diocese" , "church" ], help = "Spécifiez l'entité à traiter : 'diocese', 'parish' ou 'church'" )
478
504
parser .add_argument ("-v" , "--verbose" , action = "count" , default = 0 , help = "Augmente le niveau de verbosité (utilisez -vvv pour plus de détails)." )
479
505
args = parser .parse_args ()
480
- process_entity (args .entity_only , 100 , args .verbose )
506
+
507
+ processor = Processor ()
508
+ processor .verbosity_level = args .verbose
509
+ processor .type = args .entity_only
510
+ processor .process_entity ()
0 commit comments