From 8e16af64662f1c687944995f7c56294129e81ff0 Mon Sep 17 00:00:00 2001 From: yanbo <yanbo2695388808@gmail.com> Date: Thu, 23 Mar 2023 14:32:02 +0800 Subject: [PATCH 1/4] Optimize the status field, provide more detailed cluster status --- .../postgres-operator/postgres/constants.py | 6 + .../postgres-operator/postgres/handle.py | 244 +++++------------- .../postgres-operator/postgres/typed.py | 47 ++++ 3 files changed, 112 insertions(+), 185 deletions(-) diff --git a/platforms/kubernetes/postgres-operator/postgres/constants.py b/platforms/kubernetes/postgres-operator/postgres/constants.py index 960ff75..056dfe2 100644 --- a/platforms/kubernetes/postgres-operator/postgres/constants.py +++ b/platforms/kubernetes/postgres-operator/postgres/constants.py @@ -166,6 +166,12 @@ # storage STORAGE_CLASS_NAME = "storageClassName" +# conditions +CONDITIONS = "conditions" +CONDITION_TRUE = "True" +CONDITION_FALSE = "False" +CONDITION_UNKNOWN = "Unknown" + # pod PriorityClass SPEC_POD_PRIORITY_CLASS = "priorityClassName" SPEC_POD_PRIORITY_CLASS_SCOPE_NODE = "system-node-critical" diff --git a/platforms/kubernetes/postgres-operator/postgres/handle.py b/platforms/kubernetes/postgres-operator/postgres/handle.py index 8ece891..f055b69 100644 --- a/platforms/kubernetes/postgres-operator/postgres/handle.py +++ b/platforms/kubernetes/postgres-operator/postgres/handle.py @@ -13,182 +13,10 @@ from kubernetes import client, config from kubernetes.stream import stream from config import operator_config -from typed import LabelType, InstanceConnection, InstanceConnections, TypedDict, InstanceConnectionMachine, InstanceConnectionK8S, Tuple, Any, List +from typed import LabelType, InstanceConnection, InstanceConnections, TypedDict, InstanceConnectionMachine, InstanceConnectionK8S, Tuple, Any, List, Conditions from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger - -from constants import ( - VIP, - RADONDB_POSTGRES, - POSTGRES_OPERATOR, - AUTOFAILOVER, - POSTGRESQL, - READWRITEINSTANCE, - READONLYINSTANCE, - MACHINES, - ACTION, - ACTION_START, - ACTION_STOP, - IMAGE, - PODSPEC, - SPEC, - CONTAINERS, - CONTAINER_NAME, - PODSPEC_CONTAINERS_POSTGRESQL_CONTAINER, - PODSPEC_CONTAINERS_EXPORTER_CONTAINER, - SPEC_POSTGRESQL_READWRITE_RESOURCES_LIMITS_MEMORY, - SPEC_POSTGRESQL_READWRITE_RESOURCES_LIMITS_CPU, - PRIME_SERVICE_PORT_NAME, - EXPORTER_SERVICE_PORT_NAME, - HBAS, - CONFIGS, - REPLICAS, - VOLUMECLAIMTEMPLATES, - AUTOCTL_NODE, - PGAUTOFAILOVER_REPLICATOR, - STREAMING, - STREAMING_SYNC, - STREAMING_ASYNC, - DELETE_PVC, - POSTGRESQL_PVC_NAME, - SUCCESS, - FAILED, - SERVICES, - SELECTOR, - SERVICE_AUTOFAILOVER, - SERVICE_PRIMARY, - SERVICE_STANDBY, - SERVICE_READONLY, - SERVICE_STANDBY_READONLY, - SPEC_POSTGRESQL_USERS, - SPEC_POSTGRESQL_USERS_ADMIN, - SPEC_POSTGRESQL_USERS_MAINTENANCE, - SPEC_POSTGRESQL_USERS_NORMAL, - SPEC_POSTGRESQL_USERS_USER_NAME, - SPEC_POSTGRESQL_USERS_USER_PASSWORD, - API_GROUP, - API_VERSION_V1, - RESOURCE_POSTGRESQL, - RESOURCE_KIND_POSTGRESQL, - CLUSTER_STATE, - CLUSTER_CREATE_BEGIN, - CLUSTER_CREATE_ADD_FAILOVER, - CLUSTER_CREATE_ADD_READWRITE, - CLUSTER_CREATE_ADD_READONLY, - CLUSTER_CREATE_FINISH, - BASE_LABEL_PART_OF, - BASE_LABEL_MANAGED_BY, - BASE_LABEL_NAME, - BASE_LABEL_NAMESPACE, - LABEL_NODE, - LABEL_NODE_AUTOFAILOVER, - LABEL_NODE_POSTGRESQL, - LABEL_NODE_USER_SERVICES, - LABEL_NODE_STATEFULSET_SERVICES, - LABEL_SUBNODE, - LABEL_SUBNODE_READWRITE, - LABEL_SUBNODE_AUTOFAILOVER, - LABEL_SUBNODE_READONLY, - LABEL_ROLE, - LABEL_ROLE_PRIMARY, - LABEL_ROLE_STANDBY, - LABEL_STATEFULSET_NAME, - MACHINE_MODE, - K8S_MODE, - PGHOME, - DOCKER_COMPOSE_FILE, - DOCKER_COMPOSE_FILE_DATA, - DOCKER_COMPOSE_ENV, - DOCKER_COMPOSE_ENV_DATA, - DOCKER_COMPOSE_ENVFILE, - DOCKER_COMPOSE_EXPORTER_ENVFILE, - DOCKER_COMPOSE_DIR, - PGDATA_DIR, - ASSIST_DIR, - DATA_DIR, - INIT_FINISH, - PG_CONFIG_PREFIX, - PG_HBA_PREFIX, - RESTORE, - RESTORE_FROMSSH, - RESTORE_FROMSSH_PATH, - RESTORE_FROMSSH_ADDRESS, - RESTORE_FROMSSH_LOCAL, - PG_DATABASE_DIR, - PG_DATABASE_RESTORING_DIR, - LVS_BODY, - LVS_REAL_MAIN_SERVER, - LVS_REAL_READ_SERVER, - LVS_REAL_EMPTY_SERVER, - LVS_SET_NET, - LVS_UNSET_NET, - CLUSTER_STATUS_CREATE, - CLUSTER_STATUS_UPDATE, - CLUSTER_STATUS_RUN, - CLUSTER_STATUS_STOP, - CLUSTER_STATUS_CREATE_FAILED, - CLUSTER_STATUS_UPDATE_FAILED, - CLUSTER_STATUS_TERMINATE, - CLUSTER_STATUS, - SPEC_ANTIAFFINITY, - SPEC_ANTIAFFINITY_POLICY, - SPEC_ANTIAFFINITY_REQUIRED, - SPEC_ANTIAFFINITY_PREFERRED, - SPEC_ANTIAFFINITY_POLICY_REQUIRED, - SPEC_ANTIAFFINITY_POLICY_PREFERRED, - SPEC_ANTIAFFINITY_PODANTIAFFINITYTERM, - SPEC_ANTIAFFINITY_TOPOLOGYKEY, - SPEC_VOLUME_TYPE, - SPEC_VOLUME_LOCAL, - SPEC_VOLUME_CLOUD, - SECONDS, - MINUTES, - HOURS, - DAYS, - UPDATE_TOLERATION, - SPEC_POD_PRIORITY_CLASS, - SPEC_POD_PRIORITY_CLASS_SCOPE_NODE, - SPEC_POD_PRIORITY_CLASS_SCOPE_CLUSTER, - SPEC_S3, - SPEC_S3_ACCESS_KEY, - SPEC_S3_SECRET_KEY, - SPEC_S3_ENDPOINT, - SPEC_S3_BUCKET, - SPEC_S3_PATH, - SPEC_BACKUPCLUSTER, - SPEC_BACKUPTOS3, - SPEC_BACKUPTOS3_NAME, - SPEC_BACKUPTOS3_MANUAL, - SPEC_BACKUPTOS3_MANUAL_TRIGGER_ID, - SPEC_BACKUPTOS3_CRON, - SPEC_BACKUPTOS3_CRON_ENABLE, - SPEC_BACKUPTOS3_CRON_SCHEDULE, - SPEC_BACKUPTOS3_POLICY, - SPEC_BACKUPTOS3_POLICY_ARCHIVE, - SPEC_BACKUPTOS3_POLICY_ARCHIVE_DEFAULT_VALUE, - SPEC_BACKUPTOS3_POLICY_COMPRESSION, - SPEC_BACKUPTOS3_POLICY_COMPRESSION_DEFAULT_VALUE, - SPEC_BACKUPTOS3_POLICY_ENCRYPTION, - SPEC_BACKUPTOS3_POLICY_ENCRYPTION_DEFAULT_VALUE, - SPEC_BACKUPTOS3_POLICY_RETENTION, - SPEC_BACKUPTOS3_POLICY_RETENTION_DEFAULT_VALUE, - SPEC_BACKUPTOS3_POLICY_RETENTION_DELETE_ALL_VALUE, - RESTORE_FROMS3, - RESTORE_FROMS3_NAME, - RESTORE_FROMS3_RECOVERY, - CLUSTER_STATUS_BACKUP, - CLUSTER_STATUS_ARCHIVE, - CLUSTER_STATUS_CRON_NEXT_RUN, - RESTORE_FROMS3_RECOVERY_LATEST, - RESTORE_FROMS3_RECOVERY_LATEST_FULL, - RESTORE_FROMS3_RECOVERY_OLDEST_FULL, - RECOVERY_FINISH, - PG_LOG_FILENAME, - SPEC_REBUILD, - SPCE_REBUILD_NODENAMES, - SPEC_DELETE_S3, - STORAGE_CLASS_NAME, -) +from constants import * PGLOG_DIR = "log" PRIMARY_FORMATION = " --formation primary " @@ -400,6 +228,33 @@ def set_password(patch: kopf.Patch, status: kopf.Status) -> None: password_length)) +def patch_cluster_conditions( + patch: kopf.Patch, + type: str, + status: str, + message: str, +) -> None: + condition = Conditions( + type, status, time.strftime(DEFAULT_TIME_FORMAT, time.localtime()), + str(message)) + patch.status[CONDITIONS] = condition.to_dict() + + +def set_cluster_status_and_patch_conditions( + meta: kopf.Meta, + spec: kopf.Spec, + patch: kopf.Patch, + status: kopf.Status, + logger: logging.Logger, + statefield: str, + state: str, + condition_status: str, + condition_message: str, +) -> None: + set_cluster_status(meta, statefield, state, logger) + patch_cluster_conditions(patch, state, condition_status, condition_message) + + def create_statefulset_service( name: str, external_name: str, @@ -3444,7 +3299,10 @@ def create_cluster( logger: logging.Logger, ) -> None: try: - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_CREATE, logger) + set_cluster_status_and_patch_conditions(meta, spec, patch, status, + logger, CLUSTER_STATE, + CLUSTER_STATUS_CREATE, + CONDITION_UNKNOWN, '') logging.info("check create_cluster params") check_param(spec, logger, create=True) @@ -3464,13 +3322,18 @@ def create_cluster( time.sleep(5) # cluster running update_number_sync_standbys(meta, spec, patch, status, logger) - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_RUN, logger) + set_cluster_status_and_patch_conditions(meta, spec, patch, status, + logger, CLUSTER_STATE, + CLUSTER_STATUS_RUN, + CONDITION_TRUE, '') except Exception as e: logger.error(f"error occurs, {e}") traceback.print_exc() - traceback.format_exc() - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_CREATE_FAILED, - logger) + err = traceback.format_exc() + set_cluster_status_and_patch_conditions(meta, spec, patch, status, + logger, CLUSTER_STATE, + CLUSTER_STATUS_CREATE_FAILED, + CONDITION_FALSE, err) def delete_cluster( @@ -3480,7 +3343,10 @@ def delete_cluster( status: kopf.Status, logger: logging.Logger, ) -> None: - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_TERMINATE, logger) + set_cluster_status_and_patch_conditions(meta, spec, patch, status, logger, + CLUSTER_STATE, + CLUSTER_STATUS_TERMINATE, + CONDITION_UNKNOWN, '') delete_postgresql_cluster(meta, spec, patch, status, logger) @@ -5544,7 +5410,10 @@ def update_cluster( diffs: kopf.Diff, ) -> None: try: - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_UPDATE, logger) + set_cluster_status_and_patch_conditions(meta, spec, patch, status, + logger, CLUSTER_STATE, + CLUSTER_STATUS_UPDATE, + CONDITION_UNKNOWN, '') logger.info("check update_cluster params") check_param(spec, logger, create=False) need_roll_update = False @@ -5665,13 +5534,18 @@ def update_cluster( else: cluster_status = CLUSTER_STATUS_RUN # set Running - set_cluster_status(meta, CLUSTER_STATE, cluster_status, logger) + set_cluster_status_and_patch_conditions(meta, spec, patch, status, + logger, CLUSTER_STATE, + cluster_status, CONDITION_TRUE, + '') except Exception as e: logger.error(f"error occurs, {e}") traceback.print_exc() - traceback.format_exc() - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_UPDATE_FAILED, - logger) + err = traceback.format_exc() + set_cluster_status_and_patch_conditions(meta, spec, patch, status, + logger, CLUSTER_STATE, + CLUSTER_STATUS_UPDATE_FAILED, + CONDITION_FALSE, err) def cron_backup( diff --git a/platforms/kubernetes/postgres-operator/postgres/typed.py b/platforms/kubernetes/postgres-operator/postgres/typed.py index 8703ca8..45d1364 100644 --- a/platforms/kubernetes/postgres-operator/postgres/typed.py +++ b/platforms/kubernetes/postgres-operator/postgres/typed.py @@ -1,5 +1,6 @@ import paramiko from typing import Dict, TypedDict, TypeVar, Optional, List, Optional, Callable, Tuple, Any +import six from constants import ( AUTOFAILOVER, POSTGRESQL, @@ -114,3 +115,49 @@ def get_number(self): def free_conns(self): for conn in self.conns: conn.free_conn() + + +class Conditions: + + def __init__(self, type: str, status: str, lastTransitionTime: str, + message: str): + # Type of cluster condition, same as status.state. Running/CreateFailed/UpdateFailed ... + self.type = type + + # Status of the condition, one of True/False/Unknown. + self.status = status + + # The last time this Condition type changed. + self.lastTransitionTime = lastTransitionTime + + # message is a human readable message indicating details about the transition. + self.message = message + + condition_type = { + 'type': 'str', + 'status': 'str', + 'lastTransitionTime': 'str', + 'message': 'str' + } + + def to_dict(self): + result = {} + + for attr, _ in six.iteritems(self.condition_type): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list( + map(lambda x: x.to_dict() + if hasattr(x, "to_dict") else x, value)) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict( + map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items())) + else: + result[attr] = value + + return result From 311e49aa740dfd3dc9eb61f5e44e3888205e7b05 Mon Sep 17 00:00:00 2001 From: yanboer <yanbo2695388808@gmail.com> Date: Fri, 31 Mar 2023 15:08:34 +0800 Subject: [PATCH 2/4] add CONDITIONS_LIMIT param, set conditions to List --- .../postgres-operator/postgres/constants.py | 1 + .../postgres-operator/postgres/handle.py | 35 ++++++++++++++----- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/platforms/kubernetes/postgres-operator/postgres/constants.py b/platforms/kubernetes/postgres-operator/postgres/constants.py index 056dfe2..cab0371 100644 --- a/platforms/kubernetes/postgres-operator/postgres/constants.py +++ b/platforms/kubernetes/postgres-operator/postgres/constants.py @@ -168,6 +168,7 @@ # conditions CONDITIONS = "conditions" +CONDITIONS_LIMIT = 10 CONDITION_TRUE = "True" CONDITION_FALSE = "False" CONDITION_UNKNOWN = "Unknown" diff --git a/platforms/kubernetes/postgres-operator/postgres/handle.py b/platforms/kubernetes/postgres-operator/postgres/handle.py index f055b69..4acc188 100644 --- a/platforms/kubernetes/postgres-operator/postgres/handle.py +++ b/platforms/kubernetes/postgres-operator/postgres/handle.py @@ -230,14 +230,30 @@ def set_password(patch: kopf.Patch, status: kopf.Status) -> None: def patch_cluster_conditions( patch: kopf.Patch, + status: kopf.Status, + logger: logging.Logger, type: str, - status: str, + condition_status: str, message: str, + override: bool = False, ) -> None: condition = Conditions( - type, status, time.strftime(DEFAULT_TIME_FORMAT, time.localtime()), - str(message)) - patch.status[CONDITIONS] = condition.to_dict() + type, condition_status, time.strftime(DEFAULT_TIME_FORMAT, time.localtime()), + str(message)).to_dict() + conditions = status.get(CONDITIONS, []) + + if not isinstance(conditions, list): + logger.warning(f"patch_cluster_conditions failed, conditions is not list.") + return + + if len(conditions) > 0 and override: + conditions.pop() + conditions.append(condition) + # check conditions limit + if len(conditions) > CONDITIONS_LIMIT: + conditions = conditions[-1 * CONDITIONS_LIMIT:] + + patch.status[CONDITIONS] = conditions def set_cluster_status_and_patch_conditions( @@ -250,9 +266,10 @@ def set_cluster_status_and_patch_conditions( state: str, condition_status: str, condition_message: str, + override: bool = False, ) -> None: set_cluster_status(meta, statefield, state, logger) - patch_cluster_conditions(patch, state, condition_status, condition_message) + patch_cluster_conditions(patch, status, logger, state, condition_status, condition_message, override) def create_statefulset_service( @@ -3325,7 +3342,7 @@ def create_cluster( set_cluster_status_and_patch_conditions(meta, spec, patch, status, logger, CLUSTER_STATE, CLUSTER_STATUS_RUN, - CONDITION_TRUE, '') + CONDITION_TRUE, '', override=True) except Exception as e: logger.error(f"error occurs, {e}") traceback.print_exc() @@ -3333,7 +3350,7 @@ def create_cluster( set_cluster_status_and_patch_conditions(meta, spec, patch, status, logger, CLUSTER_STATE, CLUSTER_STATUS_CREATE_FAILED, - CONDITION_FALSE, err) + CONDITION_FALSE, err, override=True) def delete_cluster( @@ -5537,7 +5554,7 @@ def update_cluster( set_cluster_status_and_patch_conditions(meta, spec, patch, status, logger, CLUSTER_STATE, cluster_status, CONDITION_TRUE, - '') + '', override=True) except Exception as e: logger.error(f"error occurs, {e}") traceback.print_exc() @@ -5545,7 +5562,7 @@ def update_cluster( set_cluster_status_and_patch_conditions(meta, spec, patch, status, logger, CLUSTER_STATE, CLUSTER_STATUS_UPDATE_FAILED, - CONDITION_FALSE, err) + CONDITION_FALSE, err, override=True) def cron_backup( From 76e92a0cd51dced5fb2503dc7ab42a9e3e4465ac Mon Sep 17 00:00:00 2001 From: yanboer <yanboer@users.noreply.github.com> Date: Fri, 31 Mar 2023 07:09:15 +0000 Subject: [PATCH 3/4] reformat code --- .../postgres-operator/postgres/handle.py | 59 ++++++++++++++----- 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/platforms/kubernetes/postgres-operator/postgres/handle.py b/platforms/kubernetes/postgres-operator/postgres/handle.py index 4acc188..2edf0ab 100644 --- a/platforms/kubernetes/postgres-operator/postgres/handle.py +++ b/platforms/kubernetes/postgres-operator/postgres/handle.py @@ -238,12 +238,14 @@ def patch_cluster_conditions( override: bool = False, ) -> None: condition = Conditions( - type, condition_status, time.strftime(DEFAULT_TIME_FORMAT, time.localtime()), + type, condition_status, + time.strftime(DEFAULT_TIME_FORMAT, time.localtime()), str(message)).to_dict() conditions = status.get(CONDITIONS, []) if not isinstance(conditions, list): - logger.warning(f"patch_cluster_conditions failed, conditions is not list.") + logger.warning( + f"patch_cluster_conditions failed, conditions is not list.") return if len(conditions) > 0 and override: @@ -269,7 +271,8 @@ def set_cluster_status_and_patch_conditions( override: bool = False, ) -> None: set_cluster_status(meta, statefield, state, logger) - patch_cluster_conditions(patch, status, logger, state, condition_status, condition_message, override) + patch_cluster_conditions(patch, status, logger, state, condition_status, + condition_message, override) def create_statefulset_service( @@ -3339,18 +3342,30 @@ def create_cluster( time.sleep(5) # cluster running update_number_sync_standbys(meta, spec, patch, status, logger) - set_cluster_status_and_patch_conditions(meta, spec, patch, status, - logger, CLUSTER_STATE, + set_cluster_status_and_patch_conditions(meta, + spec, + patch, + status, + logger, + CLUSTER_STATE, CLUSTER_STATUS_RUN, - CONDITION_TRUE, '', override=True) + CONDITION_TRUE, + '', + override=True) except Exception as e: logger.error(f"error occurs, {e}") traceback.print_exc() err = traceback.format_exc() - set_cluster_status_and_patch_conditions(meta, spec, patch, status, - logger, CLUSTER_STATE, + set_cluster_status_and_patch_conditions(meta, + spec, + patch, + status, + logger, + CLUSTER_STATE, CLUSTER_STATUS_CREATE_FAILED, - CONDITION_FALSE, err, override=True) + CONDITION_FALSE, + err, + override=True) def delete_cluster( @@ -5551,18 +5566,30 @@ def update_cluster( else: cluster_status = CLUSTER_STATUS_RUN # set Running - set_cluster_status_and_patch_conditions(meta, spec, patch, status, - logger, CLUSTER_STATE, - cluster_status, CONDITION_TRUE, - '', override=True) + set_cluster_status_and_patch_conditions(meta, + spec, + patch, + status, + logger, + CLUSTER_STATE, + cluster_status, + CONDITION_TRUE, + '', + override=True) except Exception as e: logger.error(f"error occurs, {e}") traceback.print_exc() err = traceback.format_exc() - set_cluster_status_and_patch_conditions(meta, spec, patch, status, - logger, CLUSTER_STATE, + set_cluster_status_and_patch_conditions(meta, + spec, + patch, + status, + logger, + CLUSTER_STATE, CLUSTER_STATUS_UPDATE_FAILED, - CONDITION_FALSE, err, override=True) + CONDITION_FALSE, + err, + override=True) def cron_backup( From 47b21bc02798d1755618fb1ff26525b97739c9cf Mon Sep 17 00:00:00 2001 From: yanboer <yanbo2695388808@gmail.com> Date: Mon, 3 Apr 2023 10:37:53 +0800 Subject: [PATCH 4/4] Conditions add __str__ method --- platforms/kubernetes/postgres-operator/postgres/typed.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/platforms/kubernetes/postgres-operator/postgres/typed.py b/platforms/kubernetes/postgres-operator/postgres/typed.py index 45d1364..ebe4e94 100644 --- a/platforms/kubernetes/postgres-operator/postgres/typed.py +++ b/platforms/kubernetes/postgres-operator/postgres/typed.py @@ -140,6 +140,9 @@ def __init__(self, type: str, status: str, lastTransitionTime: str, 'message': 'str' } + def __str__(self): + return str(self.to_dict()) + def to_dict(self): result = {}