@@ -330,7 +330,7 @@ def get_primary_host(
330
330
return output .strip ()
331
331
332
332
333
- def waiting_cluster_correct_status (
333
+ def waiting_cluster_final_status (
334
334
meta : kopf .Meta ,
335
335
spec : kopf .Spec ,
336
336
patch : kopf .Patch ,
@@ -341,7 +341,80 @@ def waiting_cluster_correct_status(
341
341
return
342
342
343
343
# waiting for restart
344
- time .sleep (5 )
344
+ auto_failover_conns = connections (spec , meta , patch ,
345
+ get_field (AUTOFAILOVER ), False , None ,
346
+ logger , None , status , False )
347
+ for conn in auto_failover_conns .get_conns ():
348
+ not_correct_cmd = [
349
+ "pgtools" , "-w" , "0" , "-Q" , "pg_auto_failover" , "-q" ,
350
+ '''" select count(*) from pgautofailover.node where reportedstate <> 'primary' and reportedstate <> 'secondary' and reportedstate <> 'single' "'''
351
+ ]
352
+ primary_cmd = [
353
+ "pgtools" , "-w" , "0" , "-Q" , "pg_auto_failover" , "-q" ,
354
+ '''" select count(*) from pgautofailover.node where reportedstate = 'primary' or reportedstate = 'single' "'''
355
+ ]
356
+ nodes_cmd = [
357
+ "pgtools" , "-w" , "0" , "-Q" , "pg_auto_failover" , "-q" ,
358
+ '''" select count(*) from pgautofailover.node "'''
359
+ ]
360
+
361
+ i = 0
362
+ maxtry = 60
363
+ while True :
364
+ logger .info (
365
+ f"waiting auto_failover cluster final status, { i } times. " )
366
+ i += 1
367
+ time .sleep (1 )
368
+ if i >= maxtry :
369
+ logger .warning (
370
+ f"cluster maybe maybe not right. skip waitting." )
371
+ break
372
+ output = exec_command (conn , primary_cmd , logger , interrupt = False )
373
+ if output != '1' :
374
+ logger .warning (
375
+ f"not find primary node in autofailover, output is { output } "
376
+ )
377
+ continue
378
+ output = exec_command (conn ,
379
+ not_correct_cmd ,
380
+ logger ,
381
+ interrupt = False )
382
+ if output != '0' :
383
+ logger .warning (
384
+ f"there are { output } nodes is not primary/secondary/single"
385
+ )
386
+ continue
387
+
388
+ if conn .get_machine () == None :
389
+ total_nodes = int (
390
+ spec [POSTGRESQL ][READWRITEINSTANCE ][REPLICAS ]) + int (
391
+ spec [POSTGRESQL ][READONLYINSTANCE ][REPLICAS ])
392
+ else :
393
+ total_nodes = len (
394
+ spec .get (POSTGRESQL ).get (READWRITEINSTANCE ).get (MACHINES )
395
+ ) + len (
396
+ spec .get (POSTGRESQL ).get (READONLYINSTANCE ).get (MACHINES ))
397
+ output = exec_command (conn , nodes_cmd , logger , interrupt = False )
398
+ if output != str (total_nodes ):
399
+ logger .warning (
400
+ f"there are { output } nodes in autofailover, expect { total_nodes } nodes"
401
+ )
402
+ continue
403
+
404
+ break
405
+ auto_failover_conns .free_conns ()
406
+
407
+
408
+ def waiting_cluster_correct_status (
409
+ meta : kopf .Meta ,
410
+ spec : kopf .Spec ,
411
+ patch : kopf .Patch ,
412
+ status : kopf .Status ,
413
+ logger : logging .Logger ,
414
+ ) -> None :
415
+ if spec [ACTION ] == ACTION_STOP :
416
+ return
417
+
345
418
auto_failover_conns = connections (spec , meta , patch ,
346
419
get_field (AUTOFAILOVER ), False , None ,
347
420
logger , None , status , False )
@@ -370,15 +443,6 @@ def waiting_cluster_correct_status(
370
443
logger .warning (
371
444
f"cluster maybe maybe not right. skip waitting." )
372
445
break
373
- #total_nodes = int(
374
- # spec[POSTGRESQL][READWRITEINSTANCE][REPLICAS]) + int(
375
- # spec[POSTGRESQL][READONLYINSTANCE][REPLICAS]) # TODO machinemode len(machines)
376
- #output = exec_command(conn, nodes_cmd, logger, interrupt=False)
377
- #if output != str(total_nodes):
378
- # logger.warning(
379
- # f"there are {output} nodes in autofailover, expect {total_nodes} nodes"
380
- # )
381
- # continue
382
446
output = exec_command (conn , primary_cmd , logger , interrupt = False )
383
447
if output != '1' :
384
448
logger .warning (
@@ -395,12 +459,28 @@ def waiting_cluster_correct_status(
395
459
)
396
460
continue
397
461
462
+ if conn .get_machine () == None :
463
+ total_nodes = int (
464
+ spec [POSTGRESQL ][READWRITEINSTANCE ][REPLICAS ]) + int (
465
+ spec [POSTGRESQL ][READONLYINSTANCE ][REPLICAS ])
466
+ else :
467
+ total_nodes = len (
468
+ spec .get (POSTGRESQL ).get (READWRITEINSTANCE ).get (MACHINES )
469
+ ) + len (
470
+ spec .get (POSTGRESQL ).get (READONLYINSTANCE ).get (MACHINES ))
471
+ output = exec_command (conn , nodes_cmd , logger , interrupt = False )
472
+ if output != str (total_nodes ):
473
+ logger .warning (
474
+ f"there are { output } nodes in autofailover, expect { total_nodes } nodes"
475
+ )
476
+ continue
477
+
398
478
break
399
479
auto_failover_conns .free_conns ()
400
480
401
481
402
482
def waiting_postgresql_ready (conns : InstanceConnections ,
403
- logger : logging .Logger ):
483
+ logger : logging .Logger ) -> bool :
404
484
for conn in conns .get_conns ():
405
485
i = 0
406
486
maxtry = 300
@@ -416,11 +496,10 @@ def waiting_postgresql_ready(conns: InstanceConnections,
416
496
f"postgresql is not ready. try { i } times. { output } " )
417
497
if i >= maxtry :
418
498
logger .warning (f"postgresql is not ready. skip waitting." )
419
- break
499
+ return False
420
500
else :
421
501
break
422
- # wait service refresh endpoint
423
- time .sleep (10 )
502
+ return True
424
503
425
504
426
505
def waiting_instance_ready (conns : InstanceConnections , logger : logging .Logger ):
@@ -1210,7 +1289,8 @@ def exec_command(conn: InstanceConnection,
1210
1289
if conn .get_machine () != None :
1211
1290
return docker_exec_command (conn .get_machine ().get_role (),
1212
1291
conn .get_machine ().get_ssh (), cmd , logger ,
1213
- interrupt , user )
1292
+ interrupt , user ,
1293
+ conn .get_machine ().get_host ())
1214
1294
1215
1295
1216
1296
def pod_exec_command (name : str ,
@@ -1248,7 +1328,8 @@ def docker_exec_command(role: str,
1248
1328
cmd : [str ],
1249
1329
logger : logging .Logger ,
1250
1330
interrupt : bool = True ,
1251
- user : str = "root" ) -> str :
1331
+ user : str = "root" ,
1332
+ host : str = None ) -> str :
1252
1333
if role == AUTOFAILOVER :
1253
1334
machine_data_path = operator_config .DATA_PATH_AUTOFAILOVER
1254
1335
if role == POSTGRESQL :
@@ -1257,12 +1338,13 @@ def docker_exec_command(role: str,
1257
1338
workdir = os .path .join (machine_data_path , DOCKER_COMPOSE_DIR )
1258
1339
#cmd = "cd " + workdir + "; docker-compose exec " + role + " " + " ".join(cmd)
1259
1340
cmd = "docker exec " + role + " " + " " .join (['gosu' , user ] + cmd )
1260
- logger .info (f"docker exec command { cmd } " )
1341
+ logger .info (f"docker exec command { cmd } on host { host } " )
1261
1342
ssh_stdin , ssh_stdout , ssh_stderr = ssh .exec_command (cmd , get_pty = True )
1262
1343
except Exception as e :
1263
1344
if interrupt :
1264
1345
raise kopf .PermanentError (f"can't run command: { cmd } , { e } " )
1265
1346
else :
1347
+ logger .error (f"can't run command: { cmd } , { e } " )
1266
1348
return FAILED
1267
1349
1268
1350
# see pod_exec_command, don't check ret_code
@@ -1893,7 +1975,7 @@ def create_services(
1893
1975
read_vip = service [VIP ]
1894
1976
elif service [SELECTOR ] == SERVICE_STANDBY_READONLY :
1895
1977
machines = spec .get (POSTGRESQL ).get (READWRITEINSTANCE ).get (
1896
- MACHINES )
1978
+ MACHINES ). copy ()
1897
1979
machines += spec .get (POSTGRESQL ).get (READONLYINSTANCE ).get (
1898
1980
MACHINES )
1899
1981
read_vip = service [VIP ]
@@ -1926,7 +2008,10 @@ def create_services(
1926
2008
conns = connections (spec , meta , patch ,
1927
2009
get_field (POSTGRESQL , READWRITEINSTANCE ), False ,
1928
2010
None , logger , None , status , False )
1929
- for conn in conns .get_conns ():
2011
+ readonly_conns = connections (spec , meta , patch ,
2012
+ get_field (POSTGRESQL , READONLYINSTANCE ),
2013
+ False , None , logger , None , status , False )
2014
+ for conn in (conns .get_conns () + readonly_conns .get_conns ()):
1930
2015
machine_sftp_put (conn .get_machine ().get_sftp (), lvs_conf ,
1931
2016
KEEPALIVED_CONF )
1932
2017
machine_exec_command (
@@ -1935,6 +2020,7 @@ def create_services(
1935
2020
machine_exec_command (conn .get_machine ().get_ssh (),
1936
2021
START_KEEPALIVED )
1937
2022
conns .free_conns ()
2023
+ readonly_conns .free_conns ()
1938
2024
1939
2025
1940
2026
def check_param (spec : kopf .Spec ,
@@ -2005,7 +2091,9 @@ async def create_postgresql_cluster(
2005
2091
#conns = connections(spec, meta, patch,
2006
2092
# get_field(POSTGRESQL, READWRITEINSTANCE), False, None,
2007
2093
# logger, None, status, False)
2008
- #create_users(meta, spec, patch, status, logger, conns)
2094
+ #if conns.get_conns()[0].get_machine() != None:
2095
+ # waiting_postgresql_ready(conns, logger)
2096
+ # waiting_cluster_final_status(meta, spec, patch, status, logger)
2009
2097
#conns.free_conns()
2010
2098
2011
2099
# create postgresql & readonly node
@@ -2033,7 +2121,7 @@ async def create_cluster(
2033
2121
await create_postgresql_cluster (meta , spec , patch , status , logger )
2034
2122
2035
2123
logger .info ("waiting for create_cluster success" )
2036
- waiting_cluster_correct_status (meta , spec , patch , status , logger )
2124
+ waiting_cluster_final_status (meta , spec , patch , status , logger )
2037
2125
2038
2126
# wait a few seconds to prevent the pod not running
2039
2127
time .sleep (5 )
@@ -2217,18 +2305,25 @@ async def correct_keepalived(
2217
2305
status : kopf .Status ,
2218
2306
logger : logging .Logger ,
2219
2307
) -> None :
2220
- readwrite_conns = connections (spec , meta , patch ,
2221
- get_field (POSTGRESQL , READWRITEINSTANCE ),
2222
- False , None , logger , None , status , False )
2223
- for conn in readwrite_conns .get_conns ():
2308
+ conns = connections (spec , meta , patch ,
2309
+ get_field (POSTGRESQL , READWRITEINSTANCE ), False , None ,
2310
+ logger , None , status , False )
2311
+ readonly_conns = connections (spec , meta , patch ,
2312
+ get_field (POSTGRESQL , READONLYINSTANCE ),
2313
+ False , None , logger , None , status , False )
2314
+ for conn in (conns .get_conns () + readonly_conns .get_conns ()):
2315
+ if conn .get_machine () == None :
2316
+ break
2317
+
2224
2318
output = machine_exec_command (conn .get_machine ().get_ssh (),
2225
2319
STATUS_KEEPALIVED ,
2226
2320
interrupt = False )
2227
2321
if output .find ("Active: active (running)" ) == - 1 :
2228
2322
delete_services (meta , spec , patch , status , logger )
2229
2323
create_services (meta , spec , patch , status , logger )
2230
2324
break
2231
- readwrite_conns .free_conns ()
2325
+ conns .free_conns ()
2326
+ readonly_conns .free_conns ()
2232
2327
2233
2328
2234
2329
async def correct_postgresql_role (
@@ -2591,11 +2686,15 @@ def delete_services(
2591
2686
conns = connections (spec , meta , patch ,
2592
2687
get_field (POSTGRESQL , READWRITEINSTANCE ), False ,
2593
2688
None , logger , None , status , False )
2594
- for conn in conns .get_conns ():
2689
+ readonly_conns = connections (spec , meta , patch ,
2690
+ get_field (POSTGRESQL , READONLYINSTANCE ),
2691
+ False , None , logger , None , status , False )
2692
+ for conn in (conns .get_conns () + readonly_conns .get_conns ()):
2693
+ machine_exec_command (conn .get_machine ().get_ssh (), STOP_KEEPALIVED )
2595
2694
machine_exec_command (conn .get_machine ().get_ssh (),
2596
2695
"rm -rf " + KEEPALIVED_CONF )
2597
- machine_exec_command (conn .get_machine ().get_ssh (), STOP_KEEPALIVED )
2598
2696
conns .free_conns ()
2697
+ readonly_conns .free_conns ()
2599
2698
2600
2699
2601
2700
def update_service (
@@ -2677,26 +2776,44 @@ def update_configs(
2677
2776
2678
2777
waiting_postgresql_ready (readwrite_conns , logger )
2679
2778
waiting_postgresql_ready (readonly_conns , logger )
2680
- logger . info ( "update configs(" + str ( cmd ) + ")" )
2779
+ primary_conn = None
2681
2780
for conn in conns :
2682
- if get_primary_host (
2683
- meta , spec , patch , status ,
2684
- logger ) == get_connhost (conn ) and int (
2685
- spec [POSTGRESQL ][READWRITEINSTANCE ][REPLICAS ]) > 1 :
2686
- autofailover_switchover (meta , spec , patch , status , logger )
2687
-
2688
- waiting_postgresql_ready (readwrite_conns , logger )
2689
- waiting_postgresql_ready (readonly_conns , logger )
2781
+ if get_primary_host (meta , spec , patch , status ,
2782
+ logger ) == get_connhost (conn ):
2783
+ primary_conn = conn
2784
+ continue
2690
2785
2786
+ #if conn.get_machine() != None:
2787
+ # replicas = len(spec[POSTGRESQL][READWRITEINSTANCE][MACHINES])
2788
+ #else:
2789
+ # replicas = int(spec[POSTGRESQL][READWRITEINSTANCE][REPLICAS])
2790
+ #if replicas > 1 and get_primary_host( meta, spec, patch, status, logger) == get_connhost(conn):
2791
+ # autofailover_switchover(meta, spec, patch, status, logger)
2792
+ # if port_change == True or restart_postgresql == True:
2793
+ # waiting_cluster_correct_status(meta, spec, patch, status, logger)
2794
+ # else:
2795
+ # waiting_cluster_final_status(meta, spec, patch, status, logger)
2796
+
2797
+ logger .info ("update configs (" + str (cmd ) +
2798
+ ") on %s " % get_connhost (conn ))
2691
2799
output = exec_command (conn , cmd , logger , interrupt = False )
2692
2800
if output .find (SUCCESS ) == - 1 :
2693
2801
logger .error (f"update configs { cmd } failed. { output } " )
2694
2802
2695
2803
#if port_change == True or restart_postgresql == True:
2696
- if port_change == True :
2697
- logger .info (f"wait readwrite instance update finish" )
2698
- waiting_cluster_correct_status (meta , spec , patch , status ,
2699
- logger )
2804
+ # time.sleep(10)
2805
+ #if port_change == True:
2806
+ # waiting_cluster_correct_status(meta, spec, patch, status,
2807
+ # logger)
2808
+ if port_change == True or restart_postgresql == True :
2809
+ waiting_cluster_correct_status (meta , spec , patch , status , logger )
2810
+ time .sleep (6 )
2811
+
2812
+ logger .info ("update configs (" + str (cmd ) +
2813
+ ") on %s " % get_connhost (primary_conn ))
2814
+ output = exec_command (primary_conn , cmd , logger , interrupt = False )
2815
+ if output .find (SUCCESS ) == - 1 :
2816
+ logger .error (f"update configs { cmd } failed. { output } " )
2700
2817
2701
2818
if port_change == True :
2702
2819
delete_services (meta , spec , patch , status , logger )
@@ -2864,24 +2981,39 @@ async def update_cluster(
2864
2981
NEW = diff [3 ]
2865
2982
2866
2983
logger .info (diff )
2867
- update_replicas (meta , spec , patch , status , logger , AC , FIELD , OLD ,
2868
- NEW )
2984
+
2869
2985
update_action (meta , spec , patch , status , logger , AC , FIELD , OLD ,
2870
2986
NEW )
2871
2987
update_service (meta , spec , patch , status , logger , AC , FIELD , OLD ,
2872
2988
NEW )
2989
+
2990
+ for diff in diffs :
2991
+ AC = diff [0 ]
2992
+ FIELD = diff [1 ]
2993
+ OLD = diff [2 ]
2994
+ NEW = diff [3 ]
2995
+
2996
+ update_replicas (meta , spec , patch , status , logger , AC , FIELD , OLD ,
2997
+ NEW )
2998
+ update_podspec_volume (meta , spec , patch , status , logger , AC , FIELD ,
2999
+ OLD , NEW )
3000
+
3001
+ for diff in diffs :
3002
+ AC = diff [0 ]
3003
+ FIELD = diff [1 ]
3004
+ OLD = diff [2 ]
3005
+ NEW = diff [3 ]
3006
+
2873
3007
update_hbas (meta , spec , patch , status , logger , AC , FIELD , OLD , NEW )
2874
3008
update_users (meta , spec , patch , status , logger , AC , FIELD , OLD ,
2875
3009
NEW )
2876
3010
update_streaming (meta , spec , patch , status , logger , AC , FIELD , OLD ,
2877
3011
NEW )
2878
- update_podspec_volume (meta , spec , patch , status , logger , AC , FIELD ,
2879
- OLD , NEW )
2880
3012
update_configs (meta , spec , patch , status , logger , AC , FIELD , OLD ,
2881
3013
NEW )
2882
3014
2883
3015
logger .info ("waiting for update_cluster success" )
2884
- waiting_cluster_correct_status (meta , spec , patch , status , logger )
3016
+ waiting_cluster_final_status (meta , spec , patch , status , logger )
2885
3017
2886
3018
# wait a few seconds to prevent the pod not running
2887
3019
time .sleep (5 )
0 commit comments