-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
Copy pathconfigmaplock.py
130 lines (115 loc) · 5.75 KB
/
configmaplock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path.append("..")
from kubernetes.client.rest import ApiException
from kubernetes import client, config
from kubernetes.client.api_client import ApiClient
from leaderelectionrecord import LeaderElectionRecord
import json
import logging
logging.basicConfig(level=logging.INFO)
class ConfigMapLock:
def __init__(self, name, namespace, identity):
"""
:param name: name of the lock
:param namespace: namespace
:param identity: A unique identifier that the candidate is using
"""
self.api_instance = client.CoreV1Api()
self.leader_electionrecord_annotationkey = 'control-plane.alpha.kubernetes.io/leader'
self.name = name
self.namespace = namespace
self.identity = str(identity)
self.configmap_reference = None
self.lock_record = {
'holderIdentity': None,
'leaseDurationSeconds': None,
'acquireTime': None,
'renewTime': None
}
# get returns the election record from a ConfigMap Annotation
def get(self, name, namespace):
"""
:param name: Name of the configmap object information to get
:param namespace: Namespace in which the configmap object is to be searched
:return: 'True, election record' if object found else 'False, exception response'
"""
try:
api_response = self.api_instance.read_namespaced_config_map(name, namespace)
# If an annotation does not exist - add the leader_electionrecord_annotationkey
annotations = api_response.metadata.annotations
if annotations is None or annotations == '':
api_response.metadata.annotations = {self.leader_electionrecord_annotationkey: ''}
self.configmap_reference = api_response
return True, None
# If an annotation exists but, the leader_electionrecord_annotationkey does not then add it as a key
if not annotations.get(self.leader_electionrecord_annotationkey):
api_response.metadata.annotations = {self.leader_electionrecord_annotationkey: ''}
self.configmap_reference = api_response
return True, None
lock_record = self.get_lock_object(json.loads(annotations[self.leader_electionrecord_annotationkey]))
self.configmap_reference = api_response
return True, lock_record
except ApiException as e:
return False, e
def create(self, name, namespace, election_record):
"""
:param electionRecord: Annotation string
:param name: Name of the configmap object to be created
:param namespace: Namespace in which the configmap object is to be created
:return: 'True' if object is created else 'False' if failed
"""
body = client.V1ConfigMap(
metadata={"name": name,
"annotations": {self.leader_electionrecord_annotationkey: json.dumps(self.get_lock_dict(election_record))}})
try:
api_response = self.api_instance.create_namespaced_config_map(namespace, body, pretty=True)
return True
except ApiException as e:
logging.info("Failed to create lock as {}".format(e))
return False
def update(self, name, namespace, updated_record):
"""
:param name: name of the lock to be updated
:param namespace: namespace the lock is in
:param updated_record: the updated election record
:return: True if update is successful False if it fails
"""
try:
# Set the updated record
self.configmap_reference.metadata.annotations[self.leader_electionrecord_annotationkey] = json.dumps(self.get_lock_dict(updated_record))
api_response = self.api_instance.replace_namespaced_config_map(name=name, namespace=namespace,
body=self.configmap_reference)
return True
except ApiException as e:
logging.info("Failed to update lock as {}".format(e))
return False
def get_lock_object(self, lock_record):
leader_election_record = LeaderElectionRecord(None, None, None, None)
if lock_record.get('holderIdentity'):
leader_election_record.holder_identity = lock_record['holderIdentity']
if lock_record.get('leaseDurationSeconds'):
leader_election_record.lease_duration = lock_record['leaseDurationSeconds']
if lock_record.get('acquireTime'):
leader_election_record.acquire_time = lock_record['acquireTime']
if lock_record.get('renewTime'):
leader_election_record.renew_time = lock_record['renewTime']
return leader_election_record
def get_lock_dict(self, leader_election_record):
self.lock_record['holderIdentity'] = leader_election_record.holder_identity
self.lock_record['leaseDurationSeconds'] = leader_election_record.lease_duration
self.lock_record['acquireTime'] = leader_election_record.acquire_time
self.lock_record['renewTime'] = leader_election_record.renew_time
return self.lock_record