-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils_streamlit.py
103 lines (77 loc) · 3.3 KB
/
utils_streamlit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from typing import Optional, Dict, Any, IO
import enum
import requests
import requests.adapters
import rekognition.utils_alert
import streamlit_config
import streamlit as st
assert not streamlit_config.api_url_root.endswith('/')
class ErrorResponse(Exception):
def __init__(self, error_code: int, body: Optional[dict]):
super(ErrorResponse, self).__init__()
self.error_code = error_code
self.body = body
def __str__(self):
return f'Error code: {self.error_code}. {str(self.body)}'
@rekognition.utils_alert.alert_slack_when_exception
def _fetch(session, url: str, method: str, data: Optional[Dict[str, Any]], files: Optional[Dict[str, IO]]) -> Dict[str, Any]:
try:
result = session.request(method=method, url=url, data=data, files=files)
except requests.exceptions.ConnectionError as e:
masking_exception = requests.exceptions.ConnectionError(f'Error on connect to {method} {url}. Original exception: {type(e)}: {e}')
raise masking_exception from None
except Exception:
raise
try:
result = result.json()
if result['error_code'] != 0:
raise ErrorResponse(error_code=result['error_code'], body=result.get('body'))
return result['body']
except ErrorResponse:
raise
except Exception:
raise
_api_session = requests.session()
_api_session.mount(streamlit_config.api_url_root, rekognition.utils.build_retry_http_adapter())
@rekognition.utils_alert.alert_slack_when_exception
def call_api(url_path: str, method: str = 'GET', data: Optional[Dict[str, Any]] = None, files: Optional[Dict[str, IO]] = None) -> Dict[str, Any]:
global _api_session
if not url_path.startswith('/'):
url_path = '/' + url_path
response = _fetch(session=_api_session, url=f'{streamlit_config.api_url_root}{url_path}', method=method, data=data, files=files)
return response
class AdminStatus(enum.Enum):
AUTHORIZED = 'authorized'
WRONG_TOKEN = 'wrong_token'
NOT_YET = 'not_yet'
def ask_admin_password(password: Optional[str] = None) -> bool:
if 'admin' not in st.session_state:
st.session_state.admin = AdminStatus.NOT_YET
admin_auth = st.empty()
if password is None:
if 'admin' not in st.experimental_get_query_params():
admin_auth.write('Please input username')
return False
username = st.experimental_get_query_params()['admin'][0]
password = st.secrets.admin[username]
else:
username = 'OVERRIDDEN'
if st.session_state.admin != AdminStatus.AUTHORIZED:
admin_auth.text_input(label=f'{username}', placeholder='password', key='admin_auth_password', type='password')
if st.session_state.admin_auth_password == password:
st.session_state.admin = AdminStatus.AUTHORIZED
else:
st.session_state.admin = AdminStatus.WRONG_TOKEN
if st.session_state.admin == AdminStatus.AUTHORIZED:
admin_auth.write('In Admin Mode')
return True
else:
return False
def ask_admin_password_if_needed(password: Optional[str] = None) -> bool:
if 'admin' in st.experimental_get_query_params():
return ask_admin_password(password=password)
else:
return True
if __name__ == '__main__':
if ask_admin_password_if_needed():
st.write('main')