forked from project-codeflare/codeflare-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth.py
122 lines (105 loc) · 4.32 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Copyright 2022 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The auth sub-module contains the definitions for the Authentication objects, which represent
the methods by which a user can authenticate to their cluster(s). The abstract class, `Authentication`,
contains two required methods `login()` and `logout()`. Users can use one of the existing concrete classes to
authenticate to their cluster or add their own custom concrete classes here.
"""
import abc
import openshift as oc
from openshift import OpenShiftPythonException
class Authentication(metaclass=abc.ABCMeta):
"""
An abstract class that defines the necessary methods for authenticating to a remote environment.
Specifically, this class defines the need for a `login()` and a `logout()` function.
"""
def login(self):
"""
Method for logging in to a remote cluster.
"""
pass
def logout(self):
"""
Method for logging out of the remote cluster.
"""
pass
class TokenAuthentication(Authentication):
"""
`TokenAuthentication` is a subclass of `Authentication`. It can be used to authenticate to an OpenShift
cluster when the user has an API token and the API server address.
"""
def __init__(self, token: str = None, server: str = None, skip_tls: bool = False):
"""
Initialize a TokenAuthentication object that requires a value for `token`, the API Token
and `server`, the API server address for authenticating to an OpenShift cluster.
"""
self.token = token
self.server = server
self.skip_tls = skip_tls
def login(self) -> str:
"""
This function is used to login to an OpenShift cluster using the user's API token and API server address.
Depending on the cluster, a user can choose to login in with "--insecure-skip-tls-verify` by setting `skip_tls`
to `True`.
"""
args = [f"--token={self.token}", f"--server={self.server}:6443"]
if self.skip_tls:
args.append("--insecure-skip-tls-verify")
try:
response = oc.invoke("login", args)
except OpenShiftPythonException as osp:
error_msg = osp.result.err()
if "The server uses a certificate signed by unknown authority" in error_msg:
return "Error: certificate auth failure, please set `skip_tls=True` in TokenAuthentication"
elif "invalid" in error_msg:
raise PermissionError(error_msg)
else:
return error_msg
return response.out()
def logout(self) -> str:
"""
This function is used to logout of an OpenShift cluster.
"""
args = [f"--token={self.token}", f"--server={self.server}:6443"]
response = oc.invoke("logout", args)
return response.out()
class PasswordUserAuthentication(Authentication):
"""
`PasswordUserAuthentication` is a subclass of `Authentication`. It can be used to authenticate to an OpenShift
cluster when the user has a username and password.
"""
def __init__(
self,
username: str = None,
password: str = None,
):
"""
Initialize a PasswordUserAuthentication object that requires a value for `username`
and `password` for authenticating to an OpenShift cluster.
"""
self.username = username
self.password = password
def login(self) -> str:
"""
This function is used to login to an OpenShift cluster using the user's `username` and `password`.
"""
response = oc.login(self.username, self.password)
return response.out()
def logout(self) -> str:
"""
This function is used to logout of an OpenShift cluster.
"""
response = oc.invoke("logout")
return response.out()