Skip to content

Commit 4e8c9ff

Browse files
committed
fix autofailover number-sync-standbys
1 parent f626cd5 commit 4e8c9ff

File tree

1 file changed

+67
-7
lines changed
  • platforms/kubernetes/postgres-operator/postgres

1 file changed

+67
-7
lines changed

platforms/kubernetes/postgres-operator/postgres/handle.py

+67-7
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
DAYS,
140140
)
141141

142+
PRIMARY_FORMATION = " --formation primary "
142143
FIELD_DELIMITER = "-"
143144
WAITING_POSTGRESQL_READY_COMMAND = ["pgtools", "-a"]
144145
INIT_FINISH_MESSAGE = "init postgresql finish"
@@ -1781,7 +1782,7 @@ def delete_postgresql(
17811782
autofailover_switchover(meta, spec, patch, status, logger)
17821783
cmd = ["pgtools", "-D"]
17831784
output = exec_command(conn, cmd, logger, interrupt=False)
1784-
if output.find("ERROR") != -1:
1785+
if output.find("drop auto_failover failed") != -1:
17851786
logger.error("can't delete postgresql instance " + output)
17861787
else:
17871788
cmd = ["pgtools", "-R"]
@@ -2509,6 +2510,7 @@ async def create_cluster(
25092510
# wait a few seconds to prevent the pod not running
25102511
time.sleep(5)
25112512
# cluster running
2513+
update_number_sync_standbys(meta, spec, patch, status, logger)
25122514
set_cluster_status(meta, CLUSTER_CREATE_CLUSTER, CLUSTER_STATUS_RUN,
25132515
logger)
25142516
except Exception as e:
@@ -2788,6 +2790,37 @@ async def timer_cluster(
27882790
await correct_postgresql_password(meta, spec, patch, status, logger)
27892791
await correct_keepalived(meta, spec, patch, status, logger)
27902792

2793+
def update_number_sync_standbys(
2794+
meta: kopf.Meta,
2795+
spec: kopf.Spec,
2796+
patch: kopf.Patch,
2797+
status: kopf.Status,
2798+
logger: logging.Logger,
2799+
) -> None:
2800+
mode, autofailover_replicas, readwrite_replicas, readonly_replicas = get_replicas(
2801+
spec)
2802+
2803+
pg_nodes = readwrite_replicas + readonly_replicas
2804+
number_sync = readwrite_replicas + readonly_replicas if spec[POSTGRESQL][READONLYINSTANCE][STREAMING] == STREAMING_SYNC else readwrite_replicas
2805+
expect_number = number_sync - 2
2806+
if expect_number < 0:
2807+
expect_number = 0
2808+
2809+
if pg_nodes >= 2:
2810+
autofailover_conns = connections(spec, meta, patch,
2811+
get_field(AUTOFAILOVER), False,
2812+
None, logger, None, status, False)
2813+
cmd = [
2814+
"pgtools", "-S",
2815+
"' formation number-sync-standbys " + str(expect_number) + PRIMARY_FORMATION + "'"
2816+
]
2817+
logger.info(f"set number-sync-standbys with cmd {cmd}")
2818+
output = exec_command(autofailover_conns.get_conns()[0], cmd, logger, interrupt=False)
2819+
if output.find(SUCCESS) == -1:
2820+
logger.error(
2821+
f"set number-sync-standbys failed {cmd} {output}")
2822+
autofailover_conns.free_conns()
2823+
27912824

27922825
def update_streaming(
27932826
meta: kopf.Meta,
@@ -2799,20 +2832,27 @@ def update_streaming(
27992832
FIELD: Tuple,
28002833
OLD: Any,
28012834
NEW: Any,
2802-
) -> None:
2835+
) -> bool:
2836+
need_update_number_sync_standbys = False
28032837
if FIELD == DIFF_FIELD_STREAMING:
28042838
if AC != DIFF_CHANGE:
28052839
logger.error(DIFF_FIELD_STREAMING + " only support " + DIFF_CHANGE)
28062840
else:
28072841
#pg_autoctl set node replication-quorum 0 --pgdata /var/lib/postgresql/data/pg_data/
28082842
if NEW == STREAMING_SYNC:
28092843
quorum = 1
2844+
need_update_number_sync_standbys = True
28102845
elif NEW == STREAMING_ASYNC:
28112846
quorum = 0
2847+
# must set number before set async
2848+
logger.info("waiting for update_cluster success on readonly treaming")
2849+
waiting_cluster_final_status(meta, spec, patch, status, logger)
2850+
update_number_sync_standbys(meta, spec, patch, status, logger)
28122851
cmd = [
28132852
"pgtools", "-S",
28142853
"'node replication-quorum " + str(quorum) + "'"
28152854
]
2855+
logger.info(f"set readonly streaming with cmd {cmd}")
28162856
conns = connections(spec, meta, patch,
28172857
get_field(POSTGRESQL, READONLYINSTANCE), False,
28182858
None, logger, None, status, False)
@@ -2823,6 +2863,8 @@ def update_streaming(
28232863
f"set readonly streaming failed {cmd} {output}")
28242864
conns.free_conns()
28252865

2866+
return need_update_number_sync_standbys
2867+
28262868

28272869
def postgresql_action(
28282870
meta: kopf.Meta,
@@ -3077,7 +3119,8 @@ def update_replicas(
30773119
FIELD: Tuple,
30783120
OLD: Any,
30793121
NEW: Any,
3080-
) -> None:
3122+
) -> bool:
3123+
need_update_number_sync_standbys = False
30813124
if FIELD == DIFF_FIELD_READWRITE_REPLICAS:
30823125
if AC != DIFF_CHANGE:
30833126
#raise kopf.TemporaryError("Exception when calling list_pod_for_all_namespaces: %s\n" % e)
@@ -3093,6 +3136,7 @@ def update_replicas(
30933136
meta, spec, patch, status, logger,
30943137
get_field(POSTGRESQL, READWRITEINSTANCE), None, [NEW, OLD],
30953138
True)
3139+
need_update_number_sync_standbys = True
30963140

30973141
if FIELD == DIFF_FIELD_READWRITE_MACHINES:
30983142
if AC != DIFF_CHANGE:
@@ -3109,8 +3153,10 @@ def update_replicas(
31093153
meta, spec, patch, status, logger,
31103154
get_field(POSTGRESQL, READWRITEINSTANCE),
31113155
[i for i in OLD if i not in NEW], None, True)
3112-
delete_services(meta, spec, patch, status, logger)
3113-
create_services(meta, spec, patch, status, logger)
3156+
delete_services(meta, spec, patch, status, logger)
3157+
create_services(meta, spec, patch, status, logger)
3158+
3159+
need_update_number_sync_standbys = True
31143160

31153161
if FIELD == DIFF_FIELD_READONLY_REPLICAS:
31163162
if NEW > OLD:
@@ -3120,6 +3166,7 @@ def update_replicas(
31203166
delete_postgresql_readonly(meta, spec, patch, status, logger,
31213167
get_field(POSTGRESQL, READONLYINSTANCE),
31223168
None, [NEW, OLD], True)
3169+
need_update_number_sync_standbys = True
31233170

31243171
if FIELD == DIFF_FIELD_READONLY_MACHINES:
31253172
if OLD == None or (NEW != None and len(NEW) > len(OLD)):
@@ -3140,6 +3187,10 @@ def update_replicas(
31403187
delete_services(meta, spec, patch, status, logger)
31413188
create_services(meta, spec, patch, status, logger)
31423189

3190+
need_update_number_sync_standbys = True
3191+
3192+
return need_update_number_sync_standbys
3193+
31433194

31443195
def delete_services(
31453196
meta: kopf.Meta,
@@ -3563,6 +3614,7 @@ async def update_cluster(
35633614
logger.info("check update_cluster params")
35643615
check_param(spec, logger, create=False)
35653616
need_roll_update = False
3617+
need_update_number_sync_standbys = False
35663618

35673619
for diff in diffs:
35683620
AC = diff[0]
@@ -3583,8 +3635,10 @@ async def update_cluster(
35833635
OLD = diff[2]
35843636
NEW = diff[3]
35853637

3586-
update_replicas(meta, spec, patch, status, logger, AC, FIELD, OLD,
3638+
return_update_number_sync_standbys = update_replicas(meta, spec, patch, status, logger, AC, FIELD, OLD,
35873639
NEW)
3640+
if need_update_number_sync_standbys == False and return_update_number_sync_standbys == True:
3641+
need_update_number_sync_standbys = True
35883642
update_podspec_volume(meta, spec, patch, status, logger, AC, FIELD,
35893643
OLD, NEW)
35903644
if FIELD[0:len(DIFF_FIELD_SPEC_ANTIAFFINITY
@@ -3606,14 +3660,20 @@ async def update_cluster(
36063660
update_hbas(meta, spec, patch, status, logger, AC, FIELD, OLD, NEW)
36073661
update_users(meta, spec, patch, status, logger, AC, FIELD, OLD,
36083662
NEW)
3609-
update_streaming(meta, spec, patch, status, logger, AC, FIELD, OLD,
3663+
return_update_number_sync_standbys = update_streaming(meta, spec, patch, status, logger, AC, FIELD, OLD,
36103664
NEW)
3665+
if need_update_number_sync_standbys == False and return_update_number_sync_standbys == True:
3666+
need_update_number_sync_standbys = True
36113667
update_configs(meta, spec, patch, status, logger, AC, FIELD, OLD,
36123668
NEW)
36133669

36143670
logger.info("waiting for update_cluster success")
36153671
waiting_cluster_final_status(meta, spec, patch, status, logger)
36163672

3673+
# after waiting_cluster_final_status. update number_sync
3674+
if need_update_number_sync_standbys:
3675+
update_number_sync_standbys(meta, spec, patch, status, logger)
3676+
36173677
# wait a few seconds to prevent the pod not running
36183678
time.sleep(5)
36193679
if spec[ACTION] == ACTION_STOP:

0 commit comments

Comments
 (0)