Skip to content

Commit 31f474a

Browse files
authored
Enable slot and publication deletion when stream application is removed (#2684)
* refactor syncing publication section * update createOrUpdateStream function to allow resource deletion when removed from manifest * add minimal FES CRD to enable FES resources creation for E2E test * fix bug of removing manifest slots in syncStream * e2e test: fixing typo with major upgrade test * e2e test: should create and delete FES resource * e2e test: should not delete manual created resources * e2e test: enable cluster role for FES with patching instead of deploying in manifest
1 parent 73f7241 commit 31f474a

File tree

7 files changed

+263
-45
lines changed

7 files changed

+263
-45
lines changed

e2e/tests/k8s_api.py

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def __init__(self):
2020

2121
self.config = config.load_kube_config()
2222
self.k8s_client = client.ApiClient()
23+
self.rbac_api = client.RbacAuthorizationV1Api()
2324

2425
self.core_v1 = client.CoreV1Api()
2526
self.apps_v1 = client.AppsV1Api()

e2e/tests/test_e2e.py

+122-3
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ def setUpClass(cls):
129129
"infrastructure-roles.yaml",
130130
"infrastructure-roles-new.yaml",
131131
"custom-team-membership.yaml",
132-
"e2e-storage-class.yaml"]:
132+
"e2e-storage-class.yaml",
133+
"fes.crd.yaml"]:
133134
result = k8s.create_with_kubectl("manifests/" + filename)
134135
print("stdout: {}, stderr: {}".format(result.stdout, result.stderr))
135136

@@ -199,6 +200,7 @@ def test_additional_owner_roles(self):
199200
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", owner_query)), 3,
200201
"Not all additional users found in database", 10, 5)
201202

203+
202204
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
203205
def test_additional_pod_capabilities(self):
204206
'''
@@ -1203,7 +1205,7 @@ def check_version_14():
12031205
version = p["server_version"][0:2]
12041206
return version
12051207

1206-
self.evantuallyEqual(check_version_14, "14", "Version was not upgrade to 14")
1208+
self.eventuallyEqual(check_version_14, "14", "Version was not upgrade to 14")
12071209

12081210
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
12091211
def test_persistent_volume_claim_retention_policy(self):
@@ -1989,6 +1991,123 @@ def test_standby_cluster(self):
19891991
"acid.zalan.do", "v1", "default", "postgresqls", "acid-standby-cluster")
19901992
time.sleep(5)
19911993

1994+
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
1995+
def test_stream_resources(self):
1996+
'''
1997+
Create and delete fabric event streaming resources.
1998+
'''
1999+
k8s = self.k8s
2000+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"},
2001+
"Operator does not get in sync")
2002+
leader = k8s.get_cluster_leader_pod()
2003+
2004+
# patch ClusterRole with CRUD privileges on FES resources
2005+
cluster_role = k8s.api.rbac_api.read_cluster_role("postgres-operator")
2006+
fes_cluster_role_rule = client.V1PolicyRule(
2007+
api_groups=["zalando.org"],
2008+
resources=["fabriceventstreams"],
2009+
verbs=["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"]
2010+
)
2011+
cluster_role.rules.append(fes_cluster_role_rule)
2012+
k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role)
2013+
2014+
# create a table in one of the database of acid-minimal-cluster
2015+
create_stream_table = """
2016+
CREATE TABLE test_table (id int, payload jsonb);
2017+
"""
2018+
self.query_database(leader.metadata.name, "foo", create_stream_table)
2019+
2020+
# update the manifest with the streams section
2021+
patch_streaming_config = {
2022+
"spec": {
2023+
"patroni": {
2024+
"slots": {
2025+
"manual_slot": {
2026+
"type": "physical"
2027+
}
2028+
}
2029+
},
2030+
"streams": [
2031+
{
2032+
"applicationId": "test-app",
2033+
"batchSize": 100,
2034+
"database": "foo",
2035+
"enableRecovery": True,
2036+
"tables": {
2037+
"test_table": {
2038+
"eventType": "test-event",
2039+
"idColumn": "id",
2040+
"payloadColumn": "payload",
2041+
"recoveryEventType": "test-event-dlq"
2042+
}
2043+
}
2044+
}
2045+
]
2046+
}
2047+
}
2048+
k8s.api.custom_objects_api.patch_namespaced_custom_object(
2049+
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config)
2050+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2051+
2052+
# check if publication, slot, and fes resource are created
2053+
get_publication_query = """
2054+
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app';
2055+
"""
2056+
get_slot_query = """
2057+
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app';
2058+
"""
2059+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1,
2060+
"Publication is not created", 10, 5)
2061+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1,
2062+
"Replication slot is not created", 10, 5)
2063+
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
2064+
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1,
2065+
"Could not find Fabric Event Stream resource", 10, 5)
2066+
2067+
# grant create and ownership of test_table to foo_user, reset search path to default
2068+
grant_permission_foo_user = """
2069+
GRANT CREATE ON DATABASE foo TO foo_user;
2070+
ALTER TABLE test_table OWNER TO foo_user;
2071+
ALTER ROLE foo_user RESET search_path;
2072+
"""
2073+
self.query_database(leader.metadata.name, "foo", grant_permission_foo_user)
2074+
# non-postgres user creates a publication
2075+
create_nonstream_publication = """
2076+
CREATE PUBLICATION mypublication FOR TABLE test_table;
2077+
"""
2078+
self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user")
2079+
2080+
# remove the streams section from the manifest
2081+
patch_streaming_config_removal = {
2082+
"spec": {
2083+
"streams": []
2084+
}
2085+
}
2086+
k8s.api.custom_objects_api.patch_namespaced_custom_object(
2087+
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal)
2088+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2089+
2090+
# check if publication, slot, and fes resource are removed
2091+
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
2092+
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0,
2093+
'Could not delete Fabric Event Stream resource', 10, 5)
2094+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0,
2095+
"Publication is not deleted", 10, 5)
2096+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0,
2097+
"Replication slot is not deleted", 10, 5)
2098+
2099+
# check the manual_slot and mypublication should not get deleted
2100+
get_manual_slot_query = """
2101+
SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot';
2102+
"""
2103+
get_nonstream_publication_query = """
2104+
SELECT * FROM pg_publication WHERE pubname = 'mypublication';
2105+
"""
2106+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1,
2107+
"Slot defined in patroni config is deleted", 10, 5)
2108+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1,
2109+
"Publication defined not in stream section is deleted", 10, 5)
2110+
19922111
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
19932112
def test_taint_based_eviction(self):
19942113
'''
@@ -2115,7 +2234,7 @@ def test_zz_cluster_deletion(self):
21152234
self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted")
21162235
self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted")
21172236
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
2118-
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 7, "Secrets were deleted although disabled in config")
2237+
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config")
21192238
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config")
21202239

21212240
except timeout_decorator.TimeoutError:

manifests/fes.crd.yaml

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
apiVersion: apiextensions.k8s.io/v1
2+
kind: CustomResourceDefinition
3+
metadata:
4+
name: fabriceventstreams.zalando.org
5+
spec:
6+
group: zalando.org
7+
names:
8+
kind: FabricEventStream
9+
listKind: FabricEventStreamList
10+
plural: fabriceventstreams
11+
singular: fabriceventstream
12+
shortNames:
13+
- fes
14+
categories:
15+
- all
16+
scope: Namespaced
17+
versions:
18+
- name: v1
19+
served: true
20+
storage: true
21+
schema:
22+
openAPIV3Schema:
23+
type: object

pkg/apis/zalando.org/v1/fabriceventstream.go

+6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package v1
22

33
import (
4+
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
45
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
56
)
67

@@ -89,3 +90,8 @@ type DBAuth struct {
8990
UserKey string `json:"userKey,omitempty"`
9091
PasswordKey string `json:"passwordKey,omitempty"`
9192
}
93+
94+
type Slot struct {
95+
Slot map[string]string `json:"slot"`
96+
Publication map[string]acidv1.StreamTable `json:"publication"`
97+
}

pkg/cluster/database.go

+11
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ const (
4949
getPublicationsSQL = `SELECT p.pubname, string_agg(pt.schemaname || '.' || pt.tablename, ', ' ORDER BY pt.schemaname, pt.tablename)
5050
FROM pg_publication p
5151
LEFT JOIN pg_publication_tables pt ON pt.pubname = p.pubname
52+
WHERE p.pubowner = 'postgres'::regrole
53+
AND p.pubname LIKE 'fes_%'
5254
GROUP BY p.pubname;`
5355
createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');`
5456
alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;`
57+
dropPublicationSQL = `DROP PUBLICATION "%s";`
5558

5659
globalDefaultPrivilegesSQL = `SET ROLE TO "%s";
5760
ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s";
@@ -628,6 +631,14 @@ func (c *Cluster) getPublications() (publications map[string]string, err error)
628631
return dbPublications, err
629632
}
630633

634+
func (c *Cluster) executeDropPublication(pubName string) error {
635+
c.logger.Infof("dropping publication %q", pubName)
636+
if _, err := c.pgDb.Exec(fmt.Sprintf(dropPublicationSQL, pubName)); err != nil {
637+
return fmt.Errorf("could not execute drop publication: %v", err)
638+
}
639+
return nil
640+
}
641+
631642
// executeCreatePublication creates new publication for given tables
632643
// The caller is responsible for opening and closing the database connection.
633644
func (c *Cluster) executeCreatePublication(pubName, tableList string) error {

0 commit comments

Comments
 (0)