138
138
MINUTES ,
139
139
HOURS ,
140
140
DAYS ,
141
+ UPDATE_TOLERATION ,
141
142
)
142
143
143
144
PGLOG_DIR = "log"
@@ -417,9 +418,12 @@ def waiting_cluster_final_status(
417
418
status : kopf .Status ,
418
419
logger : logging .Logger ,
419
420
timeout : int = MINUTES * 1 ,
420
- ) -> None :
421
+ except_nodes : int = None ,
422
+ ) -> bool :
423
+ is_health = True
424
+
421
425
if spec [ACTION ] == ACTION_STOP :
422
- return
426
+ return is_health
423
427
424
428
# waiting for restart
425
429
auto_failover_conns = connections (spec , meta , patch ,
@@ -449,6 +453,7 @@ def waiting_cluster_final_status(
449
453
if i >= maxtry :
450
454
logger .warning (
451
455
f"cluster maybe maybe not right. skip waitting." )
456
+ is_health = False
452
457
break
453
458
output = exec_command (conn , primary_cmd , logger , interrupt = False )
454
459
if output != '1' :
@@ -475,6 +480,8 @@ def waiting_cluster_final_status(
475
480
spec .get (POSTGRESQL ).get (READWRITEINSTANCE ).get (MACHINES )
476
481
) + len (
477
482
spec .get (POSTGRESQL ).get (READONLYINSTANCE ).get (MACHINES ))
483
+ if except_nodes is not None :
484
+ total_nodes = except_nodes
478
485
output = exec_command (conn , nodes_cmd , logger , interrupt = False )
479
486
if output != str (total_nodes ):
480
487
logger .warning (
@@ -484,6 +491,7 @@ def waiting_cluster_final_status(
484
491
485
492
break
486
493
auto_failover_conns .free_conns ()
494
+ return is_health
487
495
488
496
489
497
def waiting_cluster_correct_status (
@@ -3321,7 +3329,7 @@ def update_antiaffinity(
3321
3329
timeout : int = MINUTES * 5 ,
3322
3330
) -> None :
3323
3331
# local volume
3324
- if spec .get (SPEC_VOLUME_TYPE ) == SPEC_VOLUME_LOCAL :
3332
+ if spec .get (SPEC_VOLUME_TYPE , 'local' ) == SPEC_VOLUME_LOCAL :
3325
3333
delete_disk = True
3326
3334
timeout = HOURS * 1
3327
3335
rolling_update (meta , spec , patch , status , logger , target_roles , exit ,
@@ -3408,6 +3416,8 @@ def update_replicas(
3408
3416
3409
3417
need_update_number_sync_standbys = True
3410
3418
3419
+ waiting_cluster_final_status (meta , spec , patch , status , logger , 1 * HOURS )
3420
+
3411
3421
return need_update_number_sync_standbys
3412
3422
3413
3423
@@ -4019,6 +4029,48 @@ def local_create_user(OS: List,
4019
4029
auto_failover_conns .free_conns ()
4020
4030
4021
4031
4032
+ def get_except_nodes (
4033
+ meta : kopf .Meta ,
4034
+ spec : kopf .Spec ,
4035
+ patch : kopf .Patch ,
4036
+ status : kopf .Status ,
4037
+ logger : logging .Logger ,
4038
+ diffs : kopf .Diff ,
4039
+ ) -> int :
4040
+ mode , autofailover_replicas , readwrite_replicas , readonly_replicas = get_replicas (
4041
+ spec )
4042
+ except_readwrite_nodes = readwrite_replicas
4043
+ except_readonly_nodes = readonly_replicas
4044
+
4045
+ for diff in diffs :
4046
+ AC = diff [0 ]
4047
+ FIELD = diff [1 ]
4048
+ OLD = diff [2 ]
4049
+ NEW = diff [3 ]
4050
+
4051
+ if FIELD == DIFF_FIELD_READWRITE_REPLICAS :
4052
+ if AC != DIFF_CHANGE :
4053
+ logger .error (
4054
+ str (DIFF_FIELD_ACTION ) + " only support " + DIFF_CHANGE )
4055
+ else :
4056
+ except_readwrite_nodes = OLD
4057
+
4058
+ if FIELD == DIFF_FIELD_READWRITE_MACHINES :
4059
+ if AC != DIFF_CHANGE :
4060
+ logger .error (
4061
+ str (DIFF_FIELD_ACTION ) + " only support " + DIFF_CHANGE )
4062
+ else :
4063
+ except_readwrite_nodes = len (OLD )
4064
+
4065
+ if FIELD == DIFF_FIELD_READONLY_REPLICAS :
4066
+ except_readwrite_nodes = OLD
4067
+
4068
+ if FIELD == DIFF_FIELD_READONLY_MACHINES :
4069
+ except_readwrite_nodes = len (OLD )
4070
+
4071
+ return except_readwrite_nodes + except_readonly_nodes
4072
+
4073
+
4022
4074
# kubectl patch pg lzzhang --patch '{"spec": {"action": "stop"}}' --type=merge
4023
4075
def update_cluster (
4024
4076
meta : kopf .Meta ,
@@ -4035,6 +4087,8 @@ def update_cluster(
4035
4087
check_param (spec , logger , create = False )
4036
4088
need_roll_update = False
4037
4089
need_update_number_sync_standbys = False
4090
+ update_toleration = spec .get (UPDATE_TOLERATION , False )
4091
+ except_nodes = get_except_nodes (meta , spec , patch , status , logger , diffs )
4038
4092
4039
4093
for diff in diffs :
4040
4094
AC = diff [0 ]
@@ -4055,10 +4109,25 @@ def update_cluster(
4055
4109
OLD = diff [2 ]
4056
4110
NEW = diff [3 ]
4057
4111
4112
+ if update_toleration == False and waiting_cluster_final_status (meta , spec , patch , status , logger , except_nodes = except_nodes ) == False :
4113
+ logger .error (f"cluster status is not health." )
4114
+ raise kopf .PermanentError (f"cluster status is not health." )
4115
+
4058
4116
return_update_number_sync_standbys = update_replicas (meta , spec , patch , status , logger , AC , FIELD , OLD ,
4059
4117
NEW )
4060
4118
if need_update_number_sync_standbys == False and return_update_number_sync_standbys == True :
4061
4119
need_update_number_sync_standbys = True
4120
+
4121
+ for diff in diffs :
4122
+ AC = diff [0 ]
4123
+ FIELD = diff [1 ]
4124
+ OLD = diff [2 ]
4125
+ NEW = diff [3 ]
4126
+
4127
+ if update_toleration == False and waiting_cluster_final_status (meta , spec , patch , status , logger ) == False :
4128
+ logger .error (f"cluster status is not health." )
4129
+ raise kopf .PermanentError (f"cluster status is not health." )
4130
+
4062
4131
update_podspec_volume (meta , spec , patch , status , logger , AC , FIELD ,
4063
4132
OLD , NEW )
4064
4133
if FIELD [0 :len (DIFF_FIELD_SPEC_ANTIAFFINITY
@@ -4077,6 +4146,10 @@ def update_cluster(
4077
4146
OLD = diff [2 ]
4078
4147
NEW = diff [3 ]
4079
4148
4149
+ if update_toleration == False and waiting_cluster_final_status (meta , spec , patch , status , logger ) == False :
4150
+ logger .error (f"cluster status is not health." )
4151
+ raise kopf .PermanentError (f"cluster status is not health." )
4152
+
4080
4153
update_hbas (meta , spec , patch , status , logger , AC , FIELD , OLD , NEW )
4081
4154
update_users (meta , spec , patch , status , logger , AC , FIELD , OLD ,
4082
4155
NEW )
0 commit comments