-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathgui.py
306 lines (257 loc) · 11.2 KB
/
gui.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import logging
from datetime import datetime
from functools import partial
from typing import List, Optional
from PyQt6.QtCore import QCoreApplication
from PyQt6.QtWidgets import QMainWindow
from core.core import Core
from gui.mfa_dialog import MfaDialog
from gui.repeater import Repeater
from app.core import files
from app.core.config import Config, ProfileGroup
from app.core.core import Core
from app.core.result import Result
from app.gui.access_key_dialog import SetKeyDialog
from app.gui.assets import Assets, ICON_STYLE_OUTLINE, ICON_STYLE_ERROR, ICON_STYLE_FULL, ICON_STYLE_GCP, \
ICON_STYLE_BUSY
from app.gui.background_task import BackgroundTask
from app.gui.config_dialog import ConfigDialog
from app.gui.key_rotation_dialog import RotateKeyDialog
from app.gui.log_dialog import LogDialog
from app.gui.mfa_dialog import MfaDialog
from app.gui.repeater import Repeater
from app.gui.service_profile_dialog import ServiceProfileDialog
from app.gui.trayicon import SystemTrayIcon
from app.yubico import mfa
logger = logging.getLogger('logsmith')
class Gui(QMainWindow):
def __init__(self, app):
QMainWindow.__init__(self)
self.app = app
self.core = Core()
self.assets: Assets = Assets()
self.last_login: str = 'never'
self.login_repeater = Repeater()
self.tray_icon = SystemTrayIcon(parent=self,
assets=self.assets,
profile_list=self.core.config.list_groups())
self.log_dialog = LogDialog(self)
self.config_dialog = ConfigDialog(self)
self.set_key_dialog = SetKeyDialog(self)
self.rotate_key_dialog = RotateKeyDialog(self)
self.service_profile_dialog = ServiceProfileDialog(self)
# This is needed to keep the task alive, otherwise it crashes the application
self.task: Optional[BackgroundTask] = None
self.tray_icon.show()
def login(self, profile_group: ProfileGroup, mfa_token: Optional[str] = None):
self._to_busy_state()
self.task = BackgroundTask(
func=self.core.login,
func_kwargs={'profile_group': profile_group, 'mfa_token': mfa_token},
on_success=self._on_login_success,
on_failure=partial(self._on_login_failure, profile_group=profile_group),
on_error=self._on_error
)
self.task.start()
def _on_login_success(self):
logger.info('login success')
if self.core.active_profile_group.service_profile:
self.tray_icon.set_service_role(profile_name=self.core.active_profile_group.service_profile.source,
role_name=self.core.active_profile_group.service_profile.role)
self.tray_icon.update_region_text(self.core.get_region())
self.tray_icon.update_copy_menus(self.core.active_profile_group)
logger.info('start repeater')
prepare_login = partial(self.login, profile_group=self.core.active_profile_group)
self.login_repeater.start(task=prepare_login,
delay_seconds=300)
self._to_login_state()
def _on_login_failure(self, profile_group: ProfileGroup):
logger.info('login failure')
mfa_token = mfa.fetch_mfa_token_from_shell(self.core.config.mfa_shell_command)
if not mfa_token:
mfa_token = self.show_mfa_token_fetch_dialog()
if not mfa_token:
logger.warning('no mfa token provided')
self._to_error_state()
return
self.login(profile_group=profile_group, mfa_token=mfa_token)
def login_gcp(self, profile_group: ProfileGroup):
self._to_busy_state()
self.task = BackgroundTask(
func=self.core.login_gcp,
func_kwargs={'profile_group': profile_group},
on_success=self._on_login_gcp_success,
on_failure=self._on_error,
on_error=self._on_error
)
self.task.start()
def _on_login_gcp_success(self):
logger.info('start repeater to remind login in 8 hours')
prepare_login = partial(self.login_gcp, profile_group=self.core.active_profile_group)
self.login_repeater.start(task=prepare_login,
delay_seconds=8 * 60 * 60)
self._to_login_state()
def logout(self):
self._to_busy_state()
self.task = BackgroundTask(
func=self.core.logout,
func_kwargs={},
on_success=self._on_logout_success,
on_failure=self._on_error,
on_error=self._on_error
)
self.task.start()
def _on_logout_success(self):
self._to_reset_state()
self.tray_icon.update_region_text('not logged in')
self.tray_icon.reset_copy_menus()
def set_region(self, region: str) -> None:
self._to_busy_state()
self.task = BackgroundTask(
func=self.core.set_region,
func_kwargs={'region': region},
on_success=self._on_set_region_success,
on_failure=self._on_error,
on_error=self._on_error
)
self.task.start()
def _on_set_region_success(self) -> None:
region = self.core.get_region()
if not region:
region = 'not logged in'
self.tray_icon.update_region_text(region)
self._to_login_state()
def edit_config(self, config: Config):
self._to_busy_state()
self.task = BackgroundTask(
func=self.core.edit_config,
func_kwargs={'new_config': config},
on_success=self._on_edit_config_success,
on_failure=self._on_error,
on_error=self._on_error
)
self.task.start()
def _on_edit_config_success(self):
self.tray_icon.populate_context_menu(self.core.get_profile_group_list())
self._to_reset_state()
def set_access_key(self, key_name, key_id, key_secret):
self._to_busy_state()
logger.info('initiate set key')
self.task = BackgroundTask(
func=self.core.set_access_key,
func_kwargs={'key_name': key_name, 'key_id': key_id, 'key_secret': key_secret},
on_success=self._on_set_access_key_success,
on_failure=self._on_error,
on_error=self._on_error
)
self.task.start()
def _on_set_access_key_success(self):
logger.info('access key set')
self._signal('Success', 'access key set')
self._to_login_state()
def rotate_access_key(self, key_name: str, mfa_token: Optional[str] = None):
self._to_busy_state()
logger.info('initiate key rotation')
self.task = BackgroundTask(
func=self.core.rotate_access_key,
func_kwargs={'access_key': key_name, 'mfa_token': mfa_token},
on_success=self._on_rotate_access_key_success,
on_failure=partial(self._on_rotate_access_key_failure, key_name=key_name),
on_error=self._on_error
)
self.task.start()
def _on_rotate_access_key_success(self):
logger.info('key was rotated')
self._signal('Success', 'key was rotated')
self._to_login_state()
def _on_rotate_access_key_failure(self, key_name: str):
logger.info('rotation failure')
mfa_token = mfa.fetch_mfa_token_from_shell(self.core.config.mfa_shell_command)
if not mfa_token:
mfa_token = self.show_mfa_token_fetch_dialog()
if not mfa_token:
logger.warning('no mfa token provided')
self._to_error_state()
return
self.rotate_access_key(key_name=key_name, mfa_token=mfa_token)
def set_service_role(self, profile: str, role: str):
self._to_busy_state()
self.task = BackgroundTask(
func=self.core.set_service_role,
func_kwargs={'profile_name': profile, 'role_name': role},
on_success=self._on_set_service_role_success,
on_failure=self._on_error,
on_error=self._on_error
)
self.task.start()
def _on_set_service_role_success(self):
logger.info('service role was set')
self.login(profile_group=self.core.active_profile_group)
def set_assumable_roles(self, profile: str, role_list: List[str]):
self.task = BackgroundTask(
func=self.core.set_available_service_roles,
func_kwargs={'profile': profile, 'role_list': role_list},
on_success=self._on_set_assumable_roles_success,
on_failure=self._on_error,
on_error=self._on_error
)
self.task.start()
def _on_set_assumable_roles_success(self):
logger.info('assumable roles were set')
self._signal('Success', 'available role list was set')
def _on_error(self, error_message):
logger.error(error_message)
self._signal_error(error_message)
self._to_error_state()
@staticmethod
def show_mfa_token_fetch_dialog():
return MfaDialog().get_mfa_token()
def show_config_dialog(self):
self.config_dialog.show_dialog(self.core.config)
def show_set_key_dialog(self):
self.set_key_dialog.show_dialog(access_key_list=self.core.get_access_key_list())
def show_access_key_rotation_dialog(self):
self.rotate_key_dialog.show_dialog(access_key_list=self.core.get_access_key_list())
def show_service_role_dialog(self):
self.service_profile_dialog.show_dialog(core=self.core, config=self.core.config)
def show_logs(self):
logs_as_text = files.load_logs()
self.log_dialog.show_dialog(logs_as_text)
def _to_login_state(self):
if self.core.active_profile_group:
style = ICON_STYLE_FULL if self.core.active_profile_group.type == "aws" else ICON_STYLE_GCP
self.tray_icon.setIcon(self.assets.get_icon(style=style, color_code=self.core.get_active_profile_color()))
self.tray_icon.disable_actions(False)
self.tray_icon.update_last_login(self.get_timestamp())
else:
self._to_reset_state()
def _to_busy_state(self):
self.tray_icon.setIcon(self.assets.get_icon(ICON_STYLE_BUSY))
self.tray_icon.disable_actions(True)
def _to_reset_state(self):
self.login_repeater.stop()
self.tray_icon.setIcon(self.assets.get_icon(ICON_STYLE_OUTLINE))
self.tray_icon.disable_actions(False)
self.tray_icon.update_last_login('never')
def _to_error_state(self):
self._to_reset_state()
self.tray_icon.setIcon(self.assets.get_icon(style=ICON_STYLE_ERROR, color_code='#ff0000'))
def _check_and_signal_error(self, result: Result):
if result.was_error:
self._signal_error(result.error_message)
return False
return True
def _signal_error(self, message='unknown error'):
self._signal(topic='Logsmith Error', message=message)
self._to_error_state()
def _signal(self, topic, message):
self.tray_icon.show_message(topic, message)
@staticmethod
def get_timestamp():
return datetime.now().strftime('%H:%M')
def stop_and_exit(self):
self.login_repeater.stop()
self.exit()
@staticmethod
def exit():
QCoreApplication.exit()