Skip to content

Commit 380c7ec

Browse files
author
Craig Ringer
committed
A libsecret example for Python
1 parent fd71dff commit 380c7ec

File tree

1 file changed

+265
-0
lines changed

1 file changed

+265
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
"""
2+
Use strongest available credentials store in simple interface.
3+
4+
Will use libsecret if available, otherwise falls back to a yaml file with
5+
permissions restrictions and safe overwrite.
6+
7+
Usage:
8+
9+
import logging
10+
import secret_store
11+
12+
def main():
13+
logging.basicConfig(level='INFO')
14+
secret_store.initialize("my_app_name")
15+
16+
secret_store.store("key", "secretwhisper")
17+
whispers = secret_store.lookup("key")
18+
"""
19+
20+
import os
21+
import logging
22+
import yaml
23+
24+
logger = logging.getLogger(__name__)
25+
26+
class BaseSecretLookup:
27+
28+
def __init__(self, description=None, can_store = True):
29+
if description is None:
30+
description = "provider " + self.name
31+
self.description = description
32+
self._can_store = can_store
33+
34+
def _check_lookup(self, key):
35+
if not isinstance(key, str):
36+
raise TypeError("key must be 'str', but got key type {}".format(type(key)))
37+
if not key:
38+
raise ValueError("key must be non-empty string")
39+
40+
def _check_store(self, key, secret):
41+
if not isinstance(key, str) or not isinstance(secret, str):
42+
raise TypeError("key and secret must be strings, but got key {} secret {}".format(type(key), type(secret)))
43+
if not key:
44+
raise ValueError("key must be non-empty string")
45+
if not secret:
46+
raise ValueError("secret must be non-empty string")
47+
48+
@classmethod
49+
def is_usable(self):
50+
"""Can this provider be used?"""
51+
return True
52+
53+
@property
54+
def can_store(self):
55+
"""Can this provider store new secrets?"""
56+
return self._can_store
57+
58+
class EnvLookup(BaseSecretLookup):
59+
60+
name = "envvar"
61+
62+
def __init__(self, appname):
63+
super().__init__(description="envirionment variables", can_store=False)
64+
65+
def lookup(self, key):
66+
"""Dummy 'secret' lookup that checks env-vars"""
67+
super()._check_lookup(key)
68+
try:
69+
return os.environ["key"]
70+
except KeyError:
71+
pass
72+
return None
73+
74+
def store(self, key, secret):
75+
super()._check_store(key, secret)
76+
raise NotImplementedError("No support for storing creds in environment")
77+
78+
def atomic_overwrite(destpath, newcontent, mode=0o0600):
79+
"""
80+
Atomically overwrite file at destpath with newcontent in a crashsafe way so
81+
that stupid file systems don't leave us with an empty file and lost
82+
original contents.
83+
"""
84+
tmppath = os.path.join(os.path.dirname(destpath), "." + os.path.basename(destpath) + ".tmp")
85+
if os.path.exists(tmppath):
86+
os.unlink(tmppath)
87+
try:
88+
tmpfile = os.fdopen(os.open(tmppath, os.O_WRONLY|os.O_CREAT|os.O_EXCL, mode),"w")
89+
assert((os.stat(tmppath).st_mode & 0o777) & (~ mode) == 0)
90+
tmpfile.write(newcontent)
91+
tmpfile.flush()
92+
os.fsync(tmpfile.fileno())
93+
tmpfile.close()
94+
dirfd = os.open(os.path.dirname(tmppath), os.O_DIRECTORY, 0)
95+
os.fsync(dirfd)
96+
os.rename(tmppath, destpath)
97+
os.fsync(dirfd)
98+
os.close(dirfd)
99+
finally:
100+
try:
101+
os.unlink(tmppath)
102+
except OSError:
103+
pass
104+
105+
class YamlConfLookup(BaseSecretLookup):
106+
107+
name = "yaml"
108+
109+
def __init__(self, appname):
110+
self.credfile = os.path.expanduser("~/.cache/{appname}/{appname}_cred.yml".format(appname=appname))
111+
super().__init__(description = "yaml file {}".format(self.credfile))
112+
113+
def _check_credfile_permissions(self):
114+
creddir = os.path.dirname(self.credfile)
115+
if os.path.exists(creddir) and os.stat(creddir).st_mode & 0o0066 > 0:
116+
raise RuntimeError("creds dir at {} is group or world readable or writeable, refusing to use it".format(creddir))
117+
if os.path.exists(self.credfile) and os.stat(self.credfile).st_mode & 0o0066 > 0:
118+
raise RuntimeError("creds file at {} is group or world readable or writeable, refusing to use it".format(self.credfile))
119+
120+
def lookup(self, key):
121+
"""Dummy 'secret' lookup that checks a creds file"""
122+
super()._check_lookup(key)
123+
self._check_credfile_permissions()
124+
if os.path.exists(self.credfile):
125+
credmap = yaml.load(open(self.credfile,"r"), Loader=yaml.SafeLoader)
126+
if key in credmap:
127+
return credmap[key]
128+
return None
129+
130+
def store(self, key, secret):
131+
super()._check_store(key, secret)
132+
os.makedirs(os.path.dirname(self.credfile), mode=0o0700, exist_ok=True)
133+
self._check_credfile_permissions()
134+
if os.path.exists(self.credfile):
135+
credmap = yaml.load(open(self.credfile,"r"), Loader=yaml.SafeLoader)
136+
else:
137+
credmap = {}
138+
credmap[key] = secret
139+
atomic_overwrite(self.credfile, yaml.dump(credmap, Dumper=yaml.SafeDumper))
140+
self._check_credfile_permissions()
141+
142+
class LibsecretLookup(BaseSecretLookup):
143+
name = "libsecret"
144+
145+
# For loading libsecret support dynamically
146+
_have_libsecret = None
147+
_Secret = None
148+
149+
@classmethod
150+
def _import_libsecret(klass):
151+
"""
152+
Import libsecret support if possible, and set _have_libsecret. If
153+
loaded, set _Secret to the gi.repository.Secret module.
154+
"""
155+
if klass._have_libsecret is None:
156+
try:
157+
import gi
158+
gi.require_version('Secret', '1')
159+
from gi.repository import Secret
160+
klass._Secret = Secret
161+
klass._have_libsecret = True
162+
except ImportError:
163+
logger.debug("cannot import libsecret: {}", exc_info=True)
164+
klass._have_libsecret = False
165+
166+
def __init__(self, appname):
167+
super().__init__(description = 'login keyring (libsecret)')
168+
self._import_libsecret()
169+
if not self._have_libsecret:
170+
raise RuntimeError("libsecret support not available")
171+
Secret = self._Secret
172+
173+
self._secret_schema = Secret.Schema.new(
174+
"com.2ndquadrant.{}.Store".format(appname),
175+
Secret.SchemaFlags.NONE,
176+
{ "key": Secret.SchemaAttributeType.STRING }
177+
)
178+
179+
def lookup(self, key):
180+
"""Use libsecret without schema qualification to look up the default set"""
181+
super()._check_lookup(key)
182+
Secret = self._Secret
183+
found_secret = Secret.password_lookup_sync(self._secret_schema, {'key': key}, None)
184+
# if we didn't find it we'll just return None
185+
return found_secret
186+
187+
def store(self, key, secret):
188+
"""store in libsecret"""
189+
super()._check_store(key, secret)
190+
Secret = self._Secret
191+
Secret.password_store_sync(self._secret_schema, {'key': key}, Secret.COLLECTION_DEFAULT, "tpacheck: {}".format(key), secret, None)
192+
193+
@classmethod
194+
def is_usable(klass):
195+
klass._import_libsecret()
196+
return klass._have_libsecret
197+
198+
candidate_providers = [LibsecretLookup, YamlConfLookup, EnvLookup]
199+
active_providers = []
200+
201+
def initialize(appname):
202+
"""Set up the module by specifying an application name"""
203+
global active_providers
204+
logger.debug("initialising with appname {}".format(appname))
205+
for provider in candidate_providers:
206+
if provider.is_usable():
207+
logger.debug("initialsing provider {}".format(provider.name))
208+
active_providers.append(provider(appname))
209+
logger.debug("initialised")
210+
else:
211+
logger.debug("skipping provider {}: reports not usable".format(provider.name))
212+
logger.debug("initialised")
213+
214+
def check_initialized():
215+
if len(active_providers) == 0:
216+
raise RuntimeError("call secrets.initialize(...) first")
217+
218+
def lookup_secret(key):
219+
"""Find secret and return it. This is the main module interface."""
220+
check_initialized()
221+
first_found = None
222+
found_locations = []
223+
logger.debug("Looking for secret {}".format(key))
224+
for provider in active_providers:
225+
logger.debug("Trying {}".format(provider.name))
226+
found = provider.lookup(key)
227+
if found is not None:
228+
logger.debug("Found in {}".format(provider.name))
229+
if first_found is None:
230+
first_found = found
231+
found_locations.append(provider.name)
232+
if len(found_locations) == 0:
233+
raise KeyError("Could not find secret {} in configured sources: {}".format(key, ', '.join([x.name for x in active_providers])))
234+
elif len(found_locations) > 1:
235+
logger.warning("Found same secret in multiple configured sources: {}".format(key, ', '.join([x.name for x in active_providers])))
236+
return first_found
237+
238+
def store_secret(key, secret):
239+
"""Store secret in first provider available. This is the main module store interface."""
240+
check_initialized()
241+
# Should do smart stuff like find if cred exists in other provider and complain etc
242+
# but in reality we should probably only use one provider anyhow.
243+
provider=active_providers[0]
244+
logger.debug("Storing secret {} in provider {}".format(key, provider.name))
245+
provider.store(key, secret)
246+
logger.debug("Checking round-trip retrieval of secret")
247+
if provider.lookup(key) != secret:
248+
raise RuntimeError("failed to store secret {} in provider {}: retrieval after store failed".format(key, provider.name))
249+
logger.debug("Stored")
250+
251+
def selftest():
252+
logging.basicConfig(level='DEBUG')
253+
initialize("tpacheck")
254+
255+
store_secret('TEST_TPACHECK_SECRET', 'notverysecret');
256+
assert(lookup_secret('TEST_TPACHECK_SECRET') == 'notverysecret')
257+
258+
for provider in active_providers:
259+
(k, v) = ('TEST_TPACHECK_SECRET_' + provider.__class__.__name__, 'notverysecret_' + provider.__class__.__name__)
260+
if provider.can_store:
261+
provider.store(k, v)
262+
assert(provider.lookup(k) == v)
263+
264+
if __name__ == '__main__':
265+
selftest()

0 commit comments

Comments
 (0)