From f79ecaa108dfb114e447165b6914d0de7207f73f Mon Sep 17 00:00:00 2001 From: shindora Date: Mon, 2 Dec 2024 11:38:52 +0700 Subject: [PATCH] Typo in retires vs retries --- splunklib/binding.py | 257 +++++++++----- splunklib/client.py | 780 ++++++++++++++++++++++++++----------------- 2 files changed, 632 insertions(+), 405 deletions(-) diff --git a/splunklib/binding.py b/splunklib/binding.py index 25a09948..e1d8412b 100644 --- a/splunklib/binding.py +++ b/splunklib/binding.py @@ -55,12 +55,26 @@ "_encode", "_make_cookie_header", "_NoAuthenticationToken", - "namespace" + "namespace", ] -SENSITIVE_KEYS = ['Authorization', 'Cookie', 'action.email.auth_password', 'auth', 'auth_password', 'clear_password', 'clientId', - 'crc-salt', 'encr_password', 'oldpassword', 'passAuth', 'password', 'session', 'suppressionKey', - 'token'] +SENSITIVE_KEYS = [ + "Authorization", + "Cookie", + "action.email.auth_password", + "auth", + "auth_password", + "clear_password", + "clientId", + "crc-salt", + "encr_password", + "oldpassword", + "passAuth", + "password", + "session", + "suppressionKey", + "token", +] # If you change these, update the docstring # on _authority as well. @@ -82,9 +96,9 @@ def new_f(*args, **kwargs): def mask_sensitive_data(data): - ''' + """ Masked sensitive fields data for logging purpose - ''' + """ if not isinstance(data, dict): try: data = json.loads(data) @@ -193,7 +207,7 @@ class UrlEncoded(str): 'ab c' + UrlEncoded('de f') == UrlEncoded('ab cde f') """ - def __new__(self, val='', skip_encode=False, encode_slash=False): + def __new__(self, val="", skip_encode=False, encode_slash=False): if isinstance(val, UrlEncoded): # Don't urllib.quote something already URL encoded. return val @@ -326,11 +340,14 @@ def wrapper(self, *args, **kwargs): # an AuthenticationError and give up. with _handle_auth_error("Autologin failed."): self.login() - with _handle_auth_error("Authentication Failed! If session token is used, it seems to have been expired."): + with _handle_auth_error( + "Authentication Failed! If session token is used, it seems to have been expired." + ): return request_fun(self, *args, **kwargs) elif he.status == 401 and not self.autologin: raise AuthenticationError( - "Request failed: Session is not logged in.", he) + "Request failed: Session is not logged in.", he + ) else: raise @@ -376,10 +393,10 @@ def _authority(scheme=DEFAULT_SCHEME, host=DEFAULT_HOST, port=DEFAULT_PORT): """ # check if host is an IPv6 address and not enclosed in [ ] - if ':' in host and not (host.startswith('[') and host.endswith(']')): + if ":" in host and not (host.startswith("[") and host.endswith("]")): # IPv6 addresses must be enclosed in [ ] in order to be well # formed. - host = '[' + host + ']' + host = "[" + host + "]" return UrlEncoded(f"{scheme}://{host}:{port}", skip_encode=True) @@ -436,11 +453,11 @@ def namespace(sharing=None, owner=None, app=None, **kwargs): n = binding.namespace(sharing="global", app="search") """ if sharing in ["system"]: - return record({'sharing': sharing, 'owner': "nobody", 'app': "system"}) + return record({"sharing": sharing, "owner": "nobody", "app": "system"}) if sharing in ["global", "app"]: - return record({'sharing': sharing, 'owner': "nobody", 'app': app}) + return record({"sharing": sharing, "owner": "nobody", "app": app}) if sharing in ["user", None]: - return record({'sharing': sharing, 'owner': owner, 'app': app}) + return record({"sharing": sharing, "owner": owner, "app": app}) raise ValueError("Invalid value for argument: 'sharing'") @@ -487,7 +504,7 @@ class Context: :type splunkToken: ``string`` :param headers: List of extra HTTP headers to send (optional). :type headers: ``list`` of 2-tuples. - :param retires: Number of retries for each HTTP connection (optional, the default is 0). + :param retries: Number of retries for each HTTP connection (optional, the default is 0). NOTE THAT THIS MAY INCREASE THE NUMBER OF ROUND TRIP CONNECTIONS TO THE SPLUNK SERVER AND BLOCK THE CURRENT THREAD WHILE RETRYING. :type retries: ``int`` @@ -510,10 +527,16 @@ class Context: """ def __init__(self, handler=None, **kwargs): - self.http = HttpLib(handler, kwargs.get("verify", False), key_file=kwargs.get("key_file"), - cert_file=kwargs.get("cert_file"), context=kwargs.get("context"), - # Default to False for backward compat - retries=kwargs.get("retries", 0), retryDelay=kwargs.get("retryDelay", 10)) + self.http = HttpLib( + handler, + kwargs.get("verify", False), + key_file=kwargs.get("key_file"), + cert_file=kwargs.get("cert_file"), + context=kwargs.get("context"), + # Default to False for backward compat + retries=kwargs.get("retries", 0), + retryDelay=kwargs.get("retryDelay", 10), + ) self.token = kwargs.get("token", _NoAuthenticationToken) if self.token is None: # In case someone explicitly passes token=None self.token = _NoAuthenticationToken @@ -531,7 +554,10 @@ def __init__(self, handler=None, **kwargs): self._self_signed_certificate = kwargs.get("self_signed_certificate", True) # Store any cookies in the self.http._cookies dict - if "cookie" in kwargs and kwargs['cookie'] not in [None, _NoAuthenticationToken]: + if "cookie" in kwargs and kwargs["cookie"] not in [ + None, + _NoAuthenticationToken, + ]: _parse_cookies(kwargs["cookie"], self.http._cookies) def get_cookies(self): @@ -568,19 +594,21 @@ def _auth_headers(self): elif self.basic and (self.username and self.password): token = f'Basic {b64encode(("%s:%s" % (self.username, self.password)).encode("utf-8")).decode("ascii")}' elif self.bearerToken: - token = f'Bearer {self.bearerToken}' + token = f"Bearer {self.bearerToken}" elif self.token is _NoAuthenticationToken: token = [] else: # Ensure the token is properly formatted - if self.token.startswith('Splunk '): + if self.token.startswith("Splunk "): token = self.token else: - token = f'Splunk {self.token}' + token = f"Splunk {self.token}" if token: header.append(("Authorization", token)) if self.get_cookies(): - header.append(("Cookie", _make_cookie_header(list(self.get_cookies().items())))) + header.append( + ("Cookie", _make_cookie_header(list(self.get_cookies().items()))) + ) return header @@ -610,7 +638,9 @@ def connect(self): context = ssl.create_default_context() context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 context.check_hostname = not self._self_signed_certificate - context.verify_mode = ssl.CERT_NONE if self._self_signed_certificate else ssl.CERT_REQUIRED + context.verify_mode = ( + ssl.CERT_NONE if self._self_signed_certificate else ssl.CERT_REQUIRED + ) sock = context.wrap_socket(sock, server_hostname=self.host) sock.connect((socket.gethostbyname(self.host), self.port)) return sock @@ -667,15 +697,20 @@ def delete(self, path_segment, owner=None, app=None, sharing=None, **query): c.logout() c.delete('apps/local') # raises AuthenticationError """ - path = self.authority + self._abspath(path_segment, owner=owner, - app=app, sharing=sharing) - logger.debug("DELETE request to %s (body: %s)", path, mask_sensitive_data(query)) + path = self.authority + self._abspath( + path_segment, owner=owner, app=app, sharing=sharing + ) + logger.debug( + "DELETE request to %s (body: %s)", path, mask_sensitive_data(query) + ) response = self.http.delete(path, self._auth_headers, **query) return response @_authentication @_log_duration - def get(self, path_segment, owner=None, app=None, headers=None, sharing=None, **query): + def get( + self, path_segment, owner=None, app=None, headers=None, sharing=None, **query + ): """Performs a GET operation from the REST path segment with the given namespace and query. @@ -730,8 +765,9 @@ def get(self, path_segment, owner=None, app=None, headers=None, sharing=None, ** if headers is None: headers = [] - path = self.authority + self._abspath(path_segment, owner=owner, - app=app, sharing=sharing) + path = self.authority + self._abspath( + path_segment, owner=owner, app=app, sharing=sharing + ) logger.debug("GET request to %s (body: %s)", path, mask_sensitive_data(query)) all_headers = headers + self.additional_headers + self._auth_headers response = self.http.get(path, all_headers, **query) @@ -739,7 +775,9 @@ def get(self, path_segment, owner=None, app=None, headers=None, sharing=None, ** @_authentication @_log_duration - def post(self, path_segment, owner=None, app=None, sharing=None, headers=None, **query): + def post( + self, path_segment, owner=None, app=None, sharing=None, headers=None, **query + ): """Performs a POST operation from the REST path segment with the given namespace and query. @@ -809,7 +847,9 @@ def post(self, path_segment, owner=None, app=None, sharing=None, headers=None, * if headers is None: headers = [] - path = self.authority + self._abspath(path_segment, owner=owner, app=app, sharing=sharing) + path = self.authority + self._abspath( + path_segment, owner=owner, app=app, sharing=sharing + ) logger.debug("POST request to %s (body: %s)", path, mask_sensitive_data(query)) all_headers = headers + self.additional_headers + self._auth_headers @@ -818,8 +858,16 @@ def post(self, path_segment, owner=None, app=None, sharing=None, headers=None, * @_authentication @_log_duration - def request(self, path_segment, method="GET", headers=None, body={}, - owner=None, app=None, sharing=None): + def request( + self, + path_segment, + method="GET", + headers=None, + body={}, + owner=None, + app=None, + sharing=None, + ): """Issues an arbitrary HTTP request to the REST path segment. This method is named to match ``httplib.request``. This function @@ -872,27 +920,28 @@ def request(self, path_segment, method="GET", headers=None, body={}, if headers is None: headers = [] - path = self.authority \ - + self._abspath(path_segment, owner=owner, - app=app, sharing=sharing) + path = self.authority + self._abspath( + path_segment, owner=owner, app=app, sharing=sharing + ) all_headers = headers + self.additional_headers + self._auth_headers - logger.debug("%s request to %s (headers: %s, body: %s)", - method, path, str(mask_sensitive_data(dict(all_headers))), mask_sensitive_data(body)) + logger.debug( + "%s request to %s (headers: %s, body: %s)", + method, + path, + str(mask_sensitive_data(dict(all_headers))), + mask_sensitive_data(body), + ) if body: body = _encode(**body) if method == "GET": - path = path + UrlEncoded('?' + body, skip_encode=True) - message = {'method': method, - 'headers': all_headers} + path = path + UrlEncoded("?" + body, skip_encode=True) + message = {"method": method, "headers": all_headers} else: - message = {'method': method, - 'headers': all_headers, - 'body': body} + message = {"method": method, "headers": all_headers, "body": body} else: - message = {'method': method, - 'headers': all_headers} + message = {"method": method, "headers": all_headers} response = self.http.request(path, message) @@ -918,15 +967,15 @@ def login(self): # Then issue requests... """ - if self.has_cookies() and \ - (not self.username and not self.password): + if self.has_cookies() and (not self.username and not self.password): # If we were passed session cookie(s), but no username or # password, then login is a nop, since we're automatically # logged in. return - if self.token is not _NoAuthenticationToken and \ - (not self.username and not self.password): + if self.token is not _NoAuthenticationToken and ( + not self.username and not self.password + ): # If we were passed a session token, but no username or # password, then login is a nop, since we're automatically # logged in. @@ -948,7 +997,8 @@ def login(self): username=self.username, password=self.password, headers=self.additional_headers, - cookie="1") # In Splunk 6.2+, passing "cookie=1" will return the "set-cookie" header + cookie="1", + ) # In Splunk 6.2+, passing "cookie=1" will return the "set-cookie" header body = response.body.read() session = XML(body).findtext("./sessionKey") @@ -966,8 +1016,7 @@ def logout(self): self.http._cookies = {} return self - def _abspath(self, path_segment, - owner=None, app=None, sharing=None): + def _abspath(self, path_segment, owner=None, app=None, sharing=None): """Qualifies *path_segment* into an absolute path for a URL. If *path_segment* is already absolute, returns it unchanged. @@ -1004,7 +1053,7 @@ def _abspath(self, path_segment, skip_encode = isinstance(path_segment, UrlEncoded) # If path_segment is absolute, escape all forbidden characters # in it and return it. - if path_segment.startswith('/'): + if path_segment.startswith("/"): return UrlEncoded(path_segment, skip_encode=skip_encode) # path_segment is relative, so we need a namespace to build an @@ -1023,7 +1072,9 @@ def _abspath(self, path_segment, oname = "nobody" if ns.owner is None else ns.owner aname = "system" if ns.app is None else ns.app - path = UrlEncoded(f"/servicesNS/{oname}/{aname}/{path_segment}", skip_encode=skip_encode) + path = UrlEncoded( + f"/servicesNS/{oname}/{aname}/{path_segment}", skip_encode=skip_encode + ) return path @@ -1136,6 +1187,7 @@ def __init__(self, message, cause): # } # + # Encode the given kwargs as a query string. This wrapper will also _encode # a list value as a sequence of assignments to the corresponding arg name, # for example an argument such as 'foo=[1,2,3]' will be encoded as @@ -1155,10 +1207,16 @@ def _spliturl(url): parsed_url = parse.urlparse(url) host = parsed_url.hostname port = parsed_url.port - path = '?'.join((parsed_url.path, parsed_url.query)) if parsed_url.query else parsed_url.path + path = ( + "?".join((parsed_url.path, parsed_url.query)) + if parsed_url.query + else parsed_url.path + ) # Strip brackets if its an IPv6 address - if host.startswith('[') and host.endswith(']'): host = host[1:-1] - if port is None: port = DEFAULT_PORT + if host.startswith("[") and host.endswith("]"): + host = host[1:-1] + if port is None: + port = DEFAULT_PORT return parsed_url.scheme, host, port, path @@ -1207,10 +1265,20 @@ class HttpLib: If using the default handler, SSL verification can be disabled by passing verify=False. """ - def __init__(self, custom_handler=None, verify=False, key_file=None, cert_file=None, context=None, retries=0, - retryDelay=10): + def __init__( + self, + custom_handler=None, + verify=False, + key_file=None, + cert_file=None, + context=None, + retries=0, + retryDelay=10, + ): if custom_handler is None: - self.handler = handler(verify=verify, key_file=key_file, cert_file=cert_file, context=context) + self.handler = handler( + verify=verify, key_file=key_file, cert_file=cert_file, context=context + ) else: self.handler = custom_handler self._cookies = {} @@ -1234,15 +1302,16 @@ def delete(self, url, headers=None, **kwargs): its structure). :rtype: ``dict`` """ - if headers is None: headers = [] + if headers is None: + headers = [] if kwargs: # url is already a UrlEncoded. We have to manually declare # the query to be encoded or it will get automatically URL # encoded by being appended to url. - url = url + UrlEncoded('?' + _encode(**kwargs), skip_encode=True) + url = url + UrlEncoded("?" + _encode(**kwargs), skip_encode=True) message = { - 'method': "DELETE", - 'headers': headers, + "method": "DELETE", + "headers": headers, } return self.request(url, message) @@ -1263,13 +1332,14 @@ def get(self, url, headers=None, **kwargs): its structure). :rtype: ``dict`` """ - if headers is None: headers = [] + if headers is None: + headers = [] if kwargs: # url is already a UrlEncoded. We have to manually declare # the query to be encoded or it will get automatically URL # encoded by being appended to url. - url = url + UrlEncoded('?' + _encode(**kwargs), skip_encode=True) - return self.request(url, {'method': "GET", 'headers': headers}) + url = url + UrlEncoded("?" + _encode(**kwargs), skip_encode=True) + return self.request(url, {"method": "GET", "headers": headers}) def post(self, url, headers=None, **kwargs): """Sends a POST request to a URL. @@ -1289,29 +1359,26 @@ def post(self, url, headers=None, **kwargs): its structure). :rtype: ``dict`` """ - if headers is None: headers = [] + if headers is None: + headers = [] # We handle GET-style arguments and an unstructured body. This is here # to support the receivers/stream endpoint. - if 'body' in kwargs: + if "body" in kwargs: # We only use application/x-www-form-urlencoded if there is no other # Content-Type header present. This can happen in cases where we # send requests as application/json, e.g. for KV Store. if len([x for x in headers if x[0].lower() == "content-type"]) == 0: headers.append(("Content-Type", "application/x-www-form-urlencoded")) - body = kwargs.pop('body') + body = kwargs.pop("body") if isinstance(body, dict): - body = _encode(**body).encode('utf-8') + body = _encode(**body).encode("utf-8") if len(kwargs) > 0: - url = url + UrlEncoded('?' + _encode(**kwargs), skip_encode=True) + url = url + UrlEncoded("?" + _encode(**kwargs), skip_encode=True) else: - body = _encode(**kwargs).encode('utf-8') - message = { - 'method': "POST", - 'headers': headers, - 'body': body - } + body = _encode(**kwargs).encode("utf-8") + message = {"method": "POST", "headers": headers, "body": body} return self.request(url, message) def request(self, url, message, **kwargs): @@ -1372,10 +1439,10 @@ class ResponseReader(io.RawIOBase): def __init__(self, response, connection=None): self._response = response self._connection = connection - self._buffer = b'' + self._buffer = b"" def __str__(self): - return str(self.read(), 'UTF-8') + return str(self.read(), "UTF-8") @property def empty(self): @@ -1410,18 +1477,18 @@ def read(self, size=None): """ r = self._buffer - self._buffer = b'' + self._buffer = b"" if size is not None: size -= len(r) r = r + self._response.read(size) return r def readable(self): - """ Indicates that the response reader is readable.""" + """Indicates that the response reader is readable.""" return True def readinto(self, byte_array): - """ Read data into a byte array, upto the size of the byte array. + """Read data into a byte array, upto the size of the byte array. :param byte_array: A byte array/memory view to pour bytes into. :type byte_array: ``bytearray`` or ``memoryview`` @@ -1452,18 +1519,21 @@ def handler(key_file=None, cert_file=None, timeout=None, verify=False, context=N def connect(scheme, host, port): kwargs = {} - if timeout is not None: kwargs['timeout'] = timeout + if timeout is not None: + kwargs["timeout"] = timeout if scheme == "http": return client.HTTPConnection(host, port, **kwargs) if scheme == "https": - if key_file is not None: kwargs['key_file'] = key_file - if cert_file is not None: kwargs['cert_file'] = cert_file + if key_file is not None: + kwargs["key_file"] = key_file + if cert_file is not None: + kwargs["cert_file"] = cert_file if not verify: - kwargs['context'] = ssl._create_unverified_context() + kwargs["context"] = ssl._create_unverified_context() elif context: # verify is True in elif branch and context is not None - kwargs['context'] = context + kwargs["context"] = context return client.HTTPSConnection(host, port, **kwargs) raise ValueError(f"unsupported scheme: {scheme}") @@ -1489,7 +1559,10 @@ def request(url, message, **kwargs): if timeout is not None: connection.sock.settimeout(timeout) response = connection.getresponse() - is_keepalive = "keep-alive" in response.getheader("connection", default="close").lower() + is_keepalive = ( + "keep-alive" + in response.getheader("connection", default="close").lower() + ) finally: if not is_keepalive: connection.close() diff --git a/splunklib/client.py b/splunklib/client.py index ee390c9e..7a56250b 100644 --- a/splunklib/client.py +++ b/splunklib/client.py @@ -70,9 +70,16 @@ from splunklib import data from splunklib.data import record -from splunklib.binding import (AuthenticationError, Context, HTTPError, UrlEncoded, - _encode, _make_cookie_header, _NoAuthenticationToken, - namespace) +from splunklib.binding import ( + AuthenticationError, + Context, + HTTPError, + UrlEncoded, + _encode, + _make_cookie_header, + _NoAuthenticationToken, + namespace, +) logger = logging.getLogger(__name__) @@ -83,7 +90,7 @@ "IncomparableException", "Service", "namespace", - "AuthenticationError" + "AuthenticationError", ] PATH_APPS = "apps/local/" @@ -175,7 +182,7 @@ def _trailing(template, *targets): n = s.find(t) if n == -1: raise ValueError("Target " + t + " not found in template.") - s = s[n + len(t):] + s = s[n + len(t) :] return s @@ -183,13 +190,17 @@ def _trailing(template, *targets): def _filter_content(content, *args): if len(args) > 0: return record((k, content[k]) for k in args) - return record((k, v) for k, v in content.items() - if k not in ['eai:acl', 'eai:attributes', 'type']) + return record( + (k, v) + for k, v in content.items() + if k not in ["eai:acl", "eai:attributes", "type"] + ) # Construct a resource path from the given base path + resource name def _path(base, name): - if not base.endswith('/'): base = base + '/' + if not base.endswith("/"): + base = base + "/" return base + name @@ -197,26 +208,27 @@ def _path(base, name): # this will ultimately be sent to an xml ElementTree so we # should use the xmlcharrefreplace option def _load_atom(response, match=None): - return data.load(response.body.read() - .decode('utf-8', 'xmlcharrefreplace'), match) + return data.load(response.body.read().decode("utf-8", "xmlcharrefreplace"), match) # Load an array of atom entries from the body of the given response def _load_atom_entries(response): r = _load_atom(response) - if 'feed' in r: + if "feed" in r: # Need this to handle a random case in the REST API - if r.feed.get('totalResults') in [0, '0']: + if r.feed.get("totalResults") in [0, "0"]: return [] - entries = r.feed.get('entry', None) - if entries is None: return None + entries = r.feed.get("entry", None) + if entries is None: + return None return entries if isinstance(entries, list) else [entries] # Unlike most other endpoints, the jobs endpoint does not return # its state wrapped in another element, but at the top level. # For example, in XML, it returns ... instead of # .... - entries = r.get('entry', None) - if entries is None: return None + entries = r.get("entry", None) + if entries is None: + return None return entries if isinstance(entries, list) else [entries] @@ -224,63 +236,69 @@ def _load_atom_entries(response): def _load_sid(response, output_mode): if output_mode == "json": json_obj = json.loads(response.body.read()) - return json_obj.get('sid') + return json_obj.get("sid") return _load_atom(response).response.sid # Parse the given atom entry record into a generic entity state record def _parse_atom_entry(entry): - title = entry.get('title', None) + title = entry.get("title", None) - elink = entry.get('link', []) + elink = entry.get("link", []) elink = elink if isinstance(elink, list) else [elink] links = record((link.rel, link.href) for link in elink) # Retrieve entity content values - content = entry.get('content', {}) + content = entry.get("content", {}) # Host entry metadata metadata = _parse_atom_metadata(content) # Filter some of the noise out of the content record - content = record((k, v) for k, v in content.items() - if k not in ['eai:acl', 'eai:attributes']) + content = record( + (k, v) for k, v in content.items() if k not in ["eai:acl", "eai:attributes"] + ) - if 'type' in content: - if isinstance(content['type'], list): - content['type'] = [t for t in content['type'] if t != 'text/xml'] + if "type" in content: + if isinstance(content["type"], list): + content["type"] = [t for t in content["type"] if t != "text/xml"] # Unset type if it was only 'text/xml' - if len(content['type']) == 0: - content.pop('type', None) + if len(content["type"]) == 0: + content.pop("type", None) # Flatten 1 element list - if len(content['type']) == 1: - content['type'] = content['type'][0] + if len(content["type"]) == 1: + content["type"] = content["type"][0] else: - content.pop('type', None) + content.pop("type", None) - return record({ - 'title': title, - 'links': links, - 'access': metadata.access, - 'fields': metadata.fields, - 'content': content, - 'updated': entry.get("updated") - }) + return record( + { + "title": title, + "links": links, + "access": metadata.access, + "fields": metadata.fields, + "content": content, + "updated": entry.get("updated"), + } + ) # Parse the metadata fields out of the given atom entry content record def _parse_atom_metadata(content): # Hoist access metadata - access = content.get('eai:acl', None) + access = content.get("eai:acl", None) # Hoist content metadata (and cleanup some naming) - attributes = content.get('eai:attributes', {}) - fields = record({ - 'required': attributes.get('requiredFields', []), - 'optional': attributes.get('optionalFields', []), - 'wildcard': attributes.get('wildcardFields', [])}) + attributes = content.get("eai:attributes", {}) + fields = record( + { + "required": attributes.get("requiredFields", []), + "optional": attributes.get("optionalFields", []), + "wildcard": attributes.get("wildcardFields", []), + } + ) - return record({'access': access, 'fields': fields}) + return record({"access": access, "fields": fields}) # kwargs: scheme, host, port, app, owner, username, password @@ -319,7 +337,7 @@ def connect(**kwargs): :type username: ``string`` :param `password`: The password for the Splunk account. :type password: ``string`` - :param retires: Number of retries for each HTTP connection (optional, the default is 0). + :param retries: Number of retries for each HTTP connection (optional, the default is 0). NOTE THAT THIS MAY INCREASE THE NUMBER OF ROUND TRIP CONNECTIONS TO THE SPLUNK SERVER. :type retries: ``int`` :param retryDelay: How long to wait between connection attempts if `retries` > 0 (optional, defaults to 10s). @@ -392,7 +410,7 @@ class Service(_BaseService): :param `password`: The password, which is used to authenticate the Splunk instance. :type password: ``string`` - :param retires: Number of retries for each HTTP connection (optional, the default is 0). + :param retries: Number of retries for each HTTP connection (optional, the default is 0). NOTE THAT THIS MAY INCREASE THE NUMBER OF ROUND TRIP CONNECTIONS TO THE SPLUNK SERVER. :type retries: ``int`` :param retryDelay: How long to wait between connection attempts if `retries` > 0 (optional, defaults to 10s). @@ -533,7 +551,9 @@ def modular_input_kinds(self): """ if self.splunk_version >= (5,): return ReadOnlyCollection(self, PATH_MODULAR_INPUTS, item=ModularInputKind) - raise IllegalOperationException("Modular inputs are not supported before Splunk version 5.") + raise IllegalOperationException( + "Modular inputs are not supported before Splunk version 5." + ) @property def storage_passwords(self): @@ -583,7 +603,11 @@ def restart(self, timeout=None): :param timeout: A timeout period, in seconds. :type timeout: ``integer`` """ - msg = {"value": "Restart requested by " + self.username + "via the Splunk SDK for Python"} + msg = { + "value": "Restart requested by " + + self.username + + "via the Splunk SDK for Python" + } # This message will be deleted once the server actually restarts. self.messages.create(name="restart_required", **msg) result = self.post("/services/server/control/restart") @@ -608,15 +632,15 @@ def restart_required(self): """ response = self.get("messages").body.read() - messages = data.load(response)['feed'] - if 'entry' not in messages: + messages = data.load(response)["feed"] + if "entry" not in messages: result = False else: - if isinstance(messages['entry'], dict): - titles = [messages['entry']['title']] + if isinstance(messages["entry"], dict): + titles = [messages["entry"]["title"]] else: - titles = [x['title'] for x in messages['entry']] - result = 'restart_required' in titles + titles = [x["title"] for x in messages["entry"]] + result = "restart_required" in titles return result @property @@ -696,24 +720,26 @@ def splunk_version(self): :return: A ``tuple`` of ``integers``. """ if self._splunk_version is None: - self._splunk_version = tuple(int(p) for p in self.info['version'].split('.')) + self._splunk_version = tuple( + int(p) for p in self.info["version"].split(".") + ) return self._splunk_version @property def splunk_instance(self): - if self._instance_type is None : + if self._instance_type is None: splunk_info = self.info - if hasattr(splunk_info, 'instance_type') : - self._instance_type = splunk_info['instance_type'] + if hasattr(splunk_info, "instance_type"): + self._instance_type = splunk_info["instance_type"] else: - self._instance_type = '' + self._instance_type = "" return self._instance_type @property def disable_v2_api(self): - if self.splunk_instance.lower() == 'cloud': - return self.splunk_version < (9,0,2209) - return self.splunk_version < (9,0,2) + if self.splunk_instance.lower() == "cloud": + return self.splunk_version < (9, 0, 2209) + return self.splunk_version < (9, 0, 2) @property def kvstore_owner(self): @@ -742,7 +768,7 @@ def kvstore(self): :return: A :class:`KVStoreCollections` collection of :class:`KVStoreCollection` entities. """ - self.namespace['owner'] = self.kvstore_owner + self.namespace["owner"] = self.kvstore_owner return KVStoreCollections(self) @property @@ -779,7 +805,9 @@ def get_api_version(self, path): # For example, "/services/search/jobs" is using API v1 api_version = 1 - versionSearch = re.search('(?:servicesNS\/[^/]+\/[^/]+|services)\/[^/]+\/v(\d+)\/', path) + versionSearch = re.search( + "(?:servicesNS\/[^/]+\/[^/]+|services)\/[^/]+\/v(\d+)\/", path + ) if versionSearch: api_version = int(versionSearch.group(1)) @@ -838,13 +866,14 @@ def get(self, path_segment="", owner=None, app=None, sharing=None, **query): # self.path to the Endpoint is relative in the SDK, so passing # owner, app, sharing, etc. along will produce the correct # namespace in the final request. - if path_segment.startswith('/'): + if path_segment.startswith("/"): path = path_segment else: - if not self.path.endswith('/') and path_segment != "": - self.path = self.path + '/' - path = self.service._abspath(self.path + path_segment, owner=owner, - app=app, sharing=sharing) + if not self.path.endswith("/") and path_segment != "": + self.path = self.path + "/" + path = self.service._abspath( + self.path + path_segment, owner=owner, app=app, sharing=sharing + ) # ^-- This was "%s%s" % (self.path, path_segment). # That doesn't work, because self.path may be UrlEncoded. @@ -859,13 +888,13 @@ def get(self, path_segment="", owner=None, app=None, sharing=None, **query): if api_version == 1: if isinstance(path, UrlEncoded): - path = UrlEncoded(path.replace(PATH_JOBS_V2, PATH_JOBS), skip_encode=True) + path = UrlEncoded( + path.replace(PATH_JOBS_V2, PATH_JOBS), skip_encode=True + ) else: path = path.replace(PATH_JOBS_V2, PATH_JOBS) - return self.service.get(path, - owner=owner, app=app, sharing=sharing, - **query) + return self.service.get(path, owner=owner, app=app, sharing=sharing, **query) def post(self, path_segment="", owner=None, app=None, sharing=None, **query): """Performs a POST operation on the path segment relative to this endpoint. @@ -916,12 +945,14 @@ def post(self, path_segment="", owner=None, app=None, sharing=None, **query): s.logout() apps.get() # raises AuthenticationError """ - if path_segment.startswith('/'): + if path_segment.startswith("/"): path = path_segment else: - if not self.path.endswith('/') and path_segment != "": - self.path = self.path + '/' - path = self.service._abspath(self.path + path_segment, owner=owner, app=app, sharing=sharing) + if not self.path.endswith("/") and path_segment != "": + self.path = self.path + "/" + path = self.service._abspath( + self.path + path_segment, owner=owner, app=app, sharing=sharing + ) # Get the API version from the path api_version = self.get_api_version(path) @@ -934,7 +965,9 @@ def post(self, path_segment="", owner=None, app=None, sharing=None, **query): if api_version == 1: if isinstance(path, UrlEncoded): - path = UrlEncoded(path.replace(PATH_JOBS_V2, PATH_JOBS), skip_encode=True) + path = UrlEncoded( + path.replace(PATH_JOBS_V2, PATH_JOBS), skip_encode=True + ) else: path = path.replace(PATH_JOBS_V2, PATH_JOBS) @@ -971,6 +1004,7 @@ class Entity(Endpoint): does not contact the server. If you think the values on the server have changed, call the :meth:`Entity.refresh` method. """ + # Not every endpoint in the API is an Entity or a Collection. For # example, a saved search at saved/searches/{name} has an additional # method saved/searches/{name}/scheduled_times, but this isn't an @@ -1005,8 +1039,8 @@ class Entity(Endpoint): def __init__(self, service, path, **kwargs): Endpoint.__init__(self, service, path) self._state = None - if not kwargs.get('skip_refresh', False): - self.refresh(kwargs.get('state', None)) # "Prefresh" + if not kwargs.get("skip_refresh", False): + self.refresh(kwargs.get("state", None)) # "Prefresh" def __contains__(self, item): try: @@ -1036,7 +1070,9 @@ def __eq__(self, other): Makes no roundtrips to the server. """ - raise IncomparableException(f"Equality is undefined for objects of class {self.__class__.__name__}") + raise IncomparableException( + f"Equality is undefined for objects of class {self.__class__.__name__}" + ) def __getattr__(self, key): # Called when an attribute was not found by the normal method. In this @@ -1058,10 +1094,11 @@ def __getitem__(self, key): def _load_atom_entry(self, response): elem = _load_atom(response, XNAME_ENTRY) if isinstance(elem, list): - apps = [ele.entry.content.get('eai:appName') for ele in elem] + apps = [ele.entry.content.get("eai:appName") for ele in elem] raise AmbiguousReferenceException( - f"Fetch from server returned multiple entries for name '{elem[0].entry.title}' in apps {apps}.") + f"Fetch from server returned multiple entries for name '{elem[0].entry.title}' in apps {apps}." + ) return elem.entry # Load the entity state record from the given response @@ -1096,13 +1133,17 @@ def _proper_namespace(self, owner=None, app=None, sharing=None): :return: """ if owner is None and app is None and sharing is None: # No namespace provided - if self._state is not None and 'access' in self._state: - return (self._state.access.owner, - self._state.access.app, - self._state.access.sharing) - return (self.service.namespace['owner'], - self.service.namespace['app'], - self.service.namespace['sharing']) + if self._state is not None and "access" in self._state: + return ( + self._state.access.owner, + self._state.access.app, + self._state.access.sharing, + ) + return ( + self.service.namespace["owner"], + self.service.namespace["app"], + self.service.namespace["sharing"], + ) return owner, app, sharing def delete(self): @@ -1115,7 +1156,9 @@ def get(self, path_segment="", owner=None, app=None, sharing=None, **query): def post(self, path_segment="", owner=None, app=None, sharing=None, **query): owner, app, sharing = self._proper_namespace(owner, app, sharing) - return super().post(path_segment, owner=owner, app=app, sharing=sharing, **query) + return super().post( + path_segment, owner=owner, app=app, sharing=sharing, **query + ) def refresh(self, state=None): """Refreshes the state of this entity. @@ -1198,14 +1241,15 @@ def name(self): return self.state.title def read(self, response): - """ Reads the current state of the entity from the server. """ + """Reads the current state of the entity from the server.""" results = self._load_state(response) # In lower layers of the SDK, we end up trying to URL encode # text to be dispatched via HTTP. However, these links are already # URL encoded when they arrive, and we need to mark them as such. - unquoted_links = dict((k, UrlEncoded(v, skip_encode=True)) - for k, v in results['links'].items()) - results['links'] = unquoted_links + unquoted_links = dict( + (k, UrlEncoded(v, skip_encode=True)) for k, v in results["links"].items() + ) + results["links"] = unquoted_links return results def reload(self): @@ -1249,7 +1293,8 @@ def state(self): :return: A ``dict`` containing fields and metadata for the entity. """ - if self._state is None: self.refresh() + if self._state is None: + self.refresh() return self._state def update(self, **kwargs): @@ -1282,8 +1327,10 @@ def update(self, **kwargs): # expected behavior of updating this Entity. Therefore, we # check for 'name' in kwargs and throw an error if it is # there. - if 'name' in kwargs: - raise IllegalOperationException('Cannot update the name of an Entity via the REST API.') + if "name" in kwargs: + raise IllegalOperationException( + "Cannot update the name of an Entity via the REST API." + ) self.post(**kwargs) return self @@ -1375,7 +1422,8 @@ def __getitem__(self, key): entries = self._load_list(response) if len(entries) > 1: raise AmbiguousReferenceException( - f"Found multiple entities named '{key}'; please specify a namespace.") + f"Found multiple entities named '{key}'; please specify a namespace." + ) if len(entries) == 0: raise KeyError(key) return entries[0] @@ -1445,10 +1493,10 @@ def _entity_path(self, state): # overloaded by Configurations, which has to switch its # entities' endpoints from its own properties/ to configs/. raw_path = parse.unquote(state.links.alternate) - if 'servicesNS/' in raw_path: - return _trailing(raw_path, 'servicesNS/', '/', '/') - if 'services/' in raw_path: - return _trailing(raw_path, 'services/') + if "servicesNS/" in raw_path: + return _trailing(raw_path, "servicesNS/", "/", "/") + if "services/" in raw_path: + return _trailing(raw_path, "services/") return raw_path def _load_list(self, response): @@ -1476,14 +1524,12 @@ def _load_list(self, response): # splunkd returns something that doesn't match # . entries = _load_atom_entries(response) - if entries is None: return [] + if entries is None: + return [] entities = [] for entry in entries: state = _parse_atom_entry(entry) - entity = self.item( - self.service, - self._entity_path(state), - state=state) + entity = self.item(self.service, self._entity_path(state), state=state) entities.append(entity) return entities @@ -1577,7 +1623,14 @@ def iter(self, offset=0, count=None, pagesize=None, **kwargs): if pagesize is None or N < pagesize: break offset += N - logger.debug("pagesize=%d, fetched=%d, offset=%d, N=%d, kwargs=%s", pagesize, fetched, offset, N, kwargs) + logger.debug( + "pagesize=%d, fetched=%d, offset=%d, N=%d, kwargs=%s", + pagesize, + fetched, + offset, + N, + kwargs, + ) # kwargs: count, offset, search, sort_dir, sort_key, sort_mode def list(self, count=None, **kwargs): @@ -1687,11 +1740,11 @@ def create(self, name, **params): """ if not isinstance(name, str): raise InvalidNameException(f"{name} is not a valid name for an entity.") - if 'namespace' in params: - namespace = params.pop('namespace') - params['owner'] = namespace.owner - params['app'] = namespace.app - params['sharing'] = namespace.sharing + if "namespace" in params: + namespace = params.pop("namespace") + params["owner"] = namespace.owner + params["app"] = namespace.app + params["sharing"] = namespace.sharing response = self.post(name=name, **params) atom = _load_atom(response, XNAME_ENTRY) if atom is None: @@ -1700,10 +1753,7 @@ def create(self, name, **params): return self[name] entry = atom.entry state = _parse_atom_entry(entry) - entity = self.item( - self.service, - self._entity_path(state), - state=state) + entity = self.item(self.service, self._entity_path(state), state=state) return entity def delete(self, name, **params): @@ -1732,11 +1782,11 @@ def delete(self, name, **params): assert 'my_saved_search' not in saved_searches """ name = UrlEncoded(name, encode_slash=True) - if 'namespace' in params: - namespace = params.pop('namespace') - params['owner'] = namespace.owner - params['app'] = namespace.app - params['sharing'] = namespace.sharing + if "namespace" in params: + namespace = params.pop("namespace") + params["owner"] = namespace.owner + params["app"] = namespace.app + params["sharing"] = namespace.sharing try: self.service.delete(_path(self.path, name), **params) except HTTPError as he: @@ -1799,15 +1849,14 @@ def get(self, name="", owner=None, app=None, sharing=None, **query): class ConfigurationFile(Collection): - """This class contains all of the stanzas from one configuration file. - """ + """This class contains all of the stanzas from one configuration file.""" # __init__'s arguments must match those of an Entity, not a # Collection, since it is being created as the elements of a # Configurations, which is a Collection subclass. def __init__(self, service, path, **kwargs): Collection.__init__(self, service, path, item=Stanza) - self.name = kwargs['state']['title'] + self.name = kwargs["state"]["title"] class Configurations(Collection): @@ -1821,7 +1870,7 @@ class Configurations(Collection): def __init__(self, service): Collection.__init__(self, service, PATH_PROPERTIES, item=ConfigurationFile) - if self.service.namespace.owner == '-' or self.service.namespace.app == '-': + if self.service.namespace.owner == "-" or self.service.namespace.app == "-": raise ValueError("Configurations cannot have wildcards in namespace.") def __getitem__(self, key): @@ -1834,7 +1883,9 @@ def __getitem__(self, key): # that multiple entities means a name collision, so we have to override it here. try: self.get(key) - return ConfigurationFile(self.service, PATH_CONF % key, state={'title': key}) + return ConfigurationFile( + self.service, PATH_CONF % key, state={"title": key} + ) except HTTPError as he: if he.status == 404: # No entity matching key raise KeyError(key) @@ -1853,7 +1904,7 @@ def __contains__(self, key): raise def create(self, name): - """ Creates a configuration file named *name*. + """Creates a configuration file named *name*. If there is already a configuration file with that name, the existing file is returned. @@ -1872,18 +1923,24 @@ def create(self, name): if response.status == 303: return self[name] if response.status == 201: - return ConfigurationFile(self.service, PATH_CONF % name, item=Stanza, state={'title': name}) - raise ValueError(f"Unexpected status code {response.status} returned from creating a stanza") + return ConfigurationFile( + self.service, PATH_CONF % name, item=Stanza, state={"title": name} + ) + raise ValueError( + f"Unexpected status code {response.status} returned from creating a stanza" + ) def delete(self, key): """Raises `IllegalOperationException`.""" - raise IllegalOperationException("Cannot delete configuration files from the REST API.") + raise IllegalOperationException( + "Cannot delete configuration files from the REST API." + ) def _entity_path(self, state): # Overridden to make all the ConfigurationFile objects # returned refer to the configs/ path instead of the # properties/ path used by Configrations. - return PATH_CONF % state['title'] + return PATH_CONF % state["title"] class Stanza(Entity): @@ -1905,35 +1962,39 @@ def __len__(self): # The stanza endpoint returns all the keys at the same level in the XML as the eai information # and 'disabled', so to get an accurate length, we have to filter those out and have just # the stanza keys. - return len([x for x in self._state.content.keys() - if not x.startswith('eai') and x != 'disabled']) + return len( + [ + x + for x in self._state.content.keys() + if not x.startswith("eai") and x != "disabled" + ] + ) class StoragePassword(Entity): - """This class contains a storage password. - """ + """This class contains a storage password.""" def __init__(self, service, path, **kwargs): - state = kwargs.get('state', None) - kwargs['skip_refresh'] = kwargs.get('skip_refresh', state is not None) + state = kwargs.get("state", None) + kwargs["skip_refresh"] = kwargs.get("skip_refresh", state is not None) super().__init__(service, path, **kwargs) self._state = state @property def clear_password(self): - return self.content.get('clear_password') + return self.content.get("clear_password") @property def encrypted_password(self): - return self.content.get('encr_password') + return self.content.get("encr_password") @property def realm(self): - return self.content.get('realm') + return self.content.get("realm") @property def username(self): - return self.content.get('username') + return self.content.get("username") class StoragePasswords(Collection): @@ -1942,12 +2003,12 @@ class StoragePasswords(Collection): """ def __init__(self, service): - if service.namespace.owner == '-' or service.namespace.app == '-': + if service.namespace.owner == "-" or service.namespace.app == "-": raise ValueError("StoragePasswords cannot have wildcards in namespace.") super().__init__(service, PATH_STORAGE_PASSWORDS, item=StoragePassword) def create(self, password, username, realm=None): - """ Creates a storage password. + """Creates a storage password. A `StoragePassword` can be identified by , or by : if the optional realm parameter is also provided. @@ -1970,11 +2031,15 @@ def create(self, password, username, realm=None): response = self.post(password=password, realm=realm, name=username) if response.status != 201: - raise ValueError(f"Unexpected status code {response.status} returned from creating a stanza") + raise ValueError( + f"Unexpected status code {response.status} returned from creating a stanza" + ) entries = _load_atom_entries(response) state = _parse_atom_entry(entries[0]) - storage_password = StoragePassword(self.service, self._entity_path(state), state=state, skip_refresh=True) + storage_password = StoragePassword( + self.service, self._entity_path(state), state=state, skip_refresh=True + ) return storage_password @@ -1999,7 +2064,11 @@ def delete(self, username, realm=None): name = username else: # Encode each component separately - name = UrlEncoded(realm, encode_slash=True) + ":" + UrlEncoded(username, encode_slash=True) + name = ( + UrlEncoded(realm, encode_slash=True) + + ":" + + UrlEncoded(username, encode_slash=True) + ) # Append the : expected at the end of the name if name[-1] != ":": @@ -2032,7 +2101,7 @@ def count(self): :return: The triggered alert count. :rtype: ``integer`` """ - return int(self.content.get('triggered_alert_count', 0)) + return int(self.content.get("triggered_alert_count", 0)) class Indexes(Collection): @@ -2041,16 +2110,16 @@ class Indexes(Collection): """ def get_default(self): - """ Returns the name of the default index. + """Returns the name of the default index. :return: The name of the default index. """ - index = self['_audit'] - return index['defaultDatabase'] + index = self["_audit"] + return index["defaultDatabase"] def delete(self, name): - """ Deletes a given index. + """Deletes a given index. **Note**: This method is only supported in Splunk 5.0 and later. @@ -2060,8 +2129,10 @@ def delete(self, name): if self.service.splunk_version >= (5,): Collection.delete(self, name) else: - raise IllegalOperationException("Deleting indexes via the REST API is " - "not supported before Splunk version 5.") + raise IllegalOperationException( + "Deleting indexes via the REST API is " + "not supported before Splunk version 5." + ) class Index(Entity): @@ -2084,13 +2155,22 @@ def attach(self, host=None, source=None, sourcetype=None): :return: A writable socket. """ - args = {'index': self.name} - if host is not None: args['host'] = host - if source is not None: args['source'] = source - if sourcetype is not None: args['sourcetype'] = sourcetype - path = UrlEncoded(PATH_RECEIVERS_STREAM + "?" + parse.urlencode(args), skip_encode=True) + args = {"index": self.name} + if host is not None: + args["host"] = host + if source is not None: + args["source"] = source + if sourcetype is not None: + args["sourcetype"] = sourcetype + path = UrlEncoded( + PATH_RECEIVERS_STREAM + "?" + parse.urlencode(args), skip_encode=True + ) - cookie_header = self.service.token if self.service.token is _NoAuthenticationToken else self.service.token.replace("Splunk ", "") + cookie_header = ( + self.service.token + if self.service.token is _NoAuthenticationToken + else self.service.token.replace("Splunk ", "") + ) cookie_or_auth_header = f"Authorization: Splunk {cookie_header}\r\n" # If we have cookie(s), use them instead of "Authorization: ..." @@ -2102,12 +2182,14 @@ def attach(self, host=None, source=None, sourcetype=None): # the connection open and use the Splunk extension headers to note # the input mode sock = self.service.connect() - headers = [f"POST {str(self.service._abspath(path))} HTTP/1.1\r\n".encode('utf-8'), - f"Host: {self.service.host}:{int(self.service.port)}\r\n".encode('utf-8'), - b"Accept-Encoding: identity\r\n", - cookie_or_auth_header.encode('utf-8'), - b"X-Splunk-Input-Mode: Streaming\r\n", - b"\r\n"] + headers = [ + f"POST {str(self.service._abspath(path))} HTTP/1.1\r\n".encode("utf-8"), + f"Host: {self.service.host}:{int(self.service.port)}\r\n".encode("utf-8"), + b"Accept-Encoding: identity\r\n", + cookie_or_auth_header.encode("utf-8"), + b"X-Splunk-Input-Mode: Streaming\r\n", + b"\r\n", + ] for h in headers: sock.write(h) @@ -2161,8 +2243,8 @@ def clean(self, timeout=60): """ self.refresh() - tds = self['maxTotalDataSizeMB'] - ftp = self['frozenTimePeriodInSecs'] + tds = self["maxTotalDataSizeMB"] + ftp = self["frozenTimePeriodInSecs"] was_disabled_initially = self.disabled try: if not was_disabled_initially and self.service.splunk_version < (5,): @@ -2175,13 +2257,14 @@ def clean(self, timeout=60): # Wait until event count goes to 0. start = datetime.now() diff = timedelta(seconds=timeout) - while self.content.totalEventCount != '0' and datetime.now() < start + diff: + while self.content.totalEventCount != "0" and datetime.now() < start + diff: sleep(1) self.refresh() - if self.content.totalEventCount != '0': + if self.content.totalEventCount != "0": raise OperationError( - f"Cleaning index {self.name} took longer than {timeout} seconds; timing out.") + f"Cleaning index {self.name} took longer than {timeout} seconds; timing out." + ) finally: # Restore original values self.update(maxTotalDataSizeMB=tds, frozenTimePeriodInSecs=ftp) @@ -2213,10 +2296,13 @@ def submit(self, event, host=None, source=None, sourcetype=None): :return: The :class:`Index`. """ - args = {'index': self.name} - if host is not None: args['host'] = host - if source is not None: args['source'] = source - if sourcetype is not None: args['sourcetype'] = sourcetype + args = {"index": self.name} + if host is not None: + args["host"] = host + if source is not None: + args["source"] = source + if sourcetype is not None: + args["sourcetype"] = sourcetype self.service.post(PATH_RECEIVERS_SIMPLE, body=event, **args) return self @@ -2236,8 +2322,8 @@ def upload(self, filename, **kwargs): :return: The :class:`Index`. """ - kwargs['index'] = self.name - path = 'data/inputs/oneshot' + kwargs["index"] = self.name + path = "data/inputs/oneshot" self.service.post(path, name=filename, **kwargs) return self @@ -2255,20 +2341,20 @@ def __init__(self, service, path, kind=None, **kwargs): # and "splunktcp" (which is "tcp/cooked"). Entity.__init__(self, service, path, **kwargs) if kind is None: - path_segments = path.split('/') - i = path_segments.index('inputs') + 1 - if path_segments[i] == 'tcp': - self.kind = path_segments[i] + '/' + path_segments[i + 1] + path_segments = path.split("/") + i = path_segments.index("inputs") + 1 + if path_segments[i] == "tcp": + self.kind = path_segments[i] + "/" + path_segments[i + 1] else: self.kind = path_segments[i] else: self.kind = kind # Handle old input kind names. - if self.kind == 'tcp': - self.kind = 'tcp/raw' - if self.kind == 'splunktcp': - self.kind = 'tcp/cooked' + if self.kind == "tcp": + self.kind = "tcp/raw" + if self.kind == "splunktcp": + self.kind = "tcp/cooked" def update(self, **kwargs): """Updates the server with any changes you've made to the current input @@ -2283,7 +2369,7 @@ def update(self, **kwargs): """ # UDP and TCP inputs require special handling due to their restrictToHost # field. For all other inputs kinds, we can dispatch to the superclass method. - if self.kind not in ['tcp', 'splunktcp', 'tcp/raw', 'tcp/cooked', 'udp']: + if self.kind not in ["tcp", "splunktcp", "tcp/raw", "tcp/cooked", "udp"]: return super().update(**kwargs) else: # The behavior of restrictToHost is inconsistent across input kinds and versions of Splunk. @@ -2300,10 +2386,12 @@ def update(self, **kwargs): # cause it to change in Splunk 5.0 and 5.0.1. to_update = kwargs.copy() - if 'restrictToHost' in kwargs: - raise IllegalOperationException("Cannot set restrictToHost on an existing input with the SDK.") - if 'restrictToHost' in self._state.content and self.kind != 'udp': - to_update['restrictToHost'] = self._state.content['restrictToHost'] + if "restrictToHost" in kwargs: + raise IllegalOperationException( + "Cannot set restrictToHost on an existing input with the SDK." + ) + if "restrictToHost" in self._state.content and self.kind != "udp": + to_update["restrictToHost"] = self._state.content["restrictToHost"] # Do the actual update operation. return super().update(**to_update) @@ -2333,7 +2421,9 @@ def __getitem__(self, key): response = self.get(self.kindpath(kind) + "/" + key) entries = self._load_list(response) if len(entries) > 1: - raise AmbiguousReferenceException(f"Found multiple inputs of kind {kind} named {key}.") + raise AmbiguousReferenceException( + f"Found multiple inputs of kind {kind} named {key}." + ) if len(entries) == 0: raise KeyError((key, kind)) return entries[0] @@ -2352,13 +2442,18 @@ def __getitem__(self, key): response = self.get(kind + "/" + key) entries = self._load_list(response) if len(entries) > 1: - raise AmbiguousReferenceException(f"Found multiple inputs of kind {kind} named {key}.") + raise AmbiguousReferenceException( + f"Found multiple inputs of kind {kind} named {key}." + ) if len(entries) == 0: pass else: - if candidate is not None: # Already found at least one candidate + if ( + candidate is not None + ): # Already found at least one candidate raise AmbiguousReferenceException( - f"Found multiple inputs named {key}, please specify a kind") + f"Found multiple inputs named {key}, please specify a kind" + ) candidate = entries[0] except HTTPError as he: if he.status == 404: @@ -2441,7 +2536,9 @@ def create(self, name, kind, **kwargs): name = UrlEncoded(name, encode_slash=True) path = _path( self.path + kindpath, - f"{kwargs['restrictToHost']}:{name}" if 'restrictToHost' in kwargs else name + f"{kwargs['restrictToHost']}:{name}" + if "restrictToHost" in kwargs + else name, ) return Input(self.service, path, kind) @@ -2521,16 +2618,16 @@ def _get_kind_list(self, subpath=None): subpath = [] kinds = [] - response = self.get('/'.join(subpath)) + response = self.get("/".join(subpath)) content = _load_atom_entries(response) for entry in content: this_subpath = subpath + [entry.title] # The "all" endpoint doesn't work yet. # The "tcp/ssl" endpoint is not a real input collection. - if entry.title == 'all' or this_subpath == ['tcp', 'ssl']: + if entry.title == "all" or this_subpath == ["tcp", "ssl"]: continue - if 'create' in [x.rel for x in entry.link]: - path = '/'.join(subpath + [entry.title]) + if "create" in [x.rel for x in entry.link]: + path = "/".join(subpath + [entry.title]) kinds.append(path) else: subkinds = self._get_kind_list(subpath + [entry.title]) @@ -2576,10 +2673,10 @@ def kindpath(self, kind): :return: The relative endpoint path. :rtype: ``string`` """ - if kind == 'tcp': - return UrlEncoded('tcp/raw', skip_encode=True) - if kind == 'splunktcp': - return UrlEncoded('tcp/cooked', skip_encode=True) + if kind == "tcp": + return UrlEncoded("tcp/raw", skip_encode=True) + if kind == "splunktcp": + return UrlEncoded("tcp/cooked", skip_encode=True) return UrlEncoded(kind, skip_encode=True) def list(self, *kinds, **kwargs): @@ -2664,7 +2761,7 @@ def list(self, *kinds, **kwargs): entities.append(entity) return entities - search = kwargs.get('search', '*') + search = kwargs.get("search", "*") entities = [] for kind in kinds: @@ -2679,7 +2776,8 @@ def list(self, *kinds, **kwargs): raise entries = _load_atom_entries(response) - if entries is None: continue # No inputs to process + if entries is None: + continue # No inputs to process for entry in entries: state = _parse_atom_entry(entry) # Unquote the URL, since all URL encoded in the SDK @@ -2688,25 +2786,25 @@ def list(self, *kinds, **kwargs): path = parse.unquote(state.links.alternate) entity = Input(self.service, path, kind, state=state) entities.append(entity) - if 'offset' in kwargs: - entities = entities[kwargs['offset']:] - if 'count' in kwargs: - entities = entities[:kwargs['count']] - if kwargs.get('sort_mode', None) == 'alpha': - sort_field = kwargs.get('sort_field', 'name') - if sort_field == 'name': + if "offset" in kwargs: + entities = entities[kwargs["offset"] :] + if "count" in kwargs: + entities = entities[: kwargs["count"]] + if kwargs.get("sort_mode", None) == "alpha": + sort_field = kwargs.get("sort_field", "name") + if sort_field == "name": f = lambda x: x.name.lower() else: f = lambda x: x[sort_field].lower() entities = sorted(entities, key=f) - if kwargs.get('sort_mode', None) == 'alpha_case': - sort_field = kwargs.get('sort_field', 'name') - if sort_field == 'name': + if kwargs.get("sort_mode", None) == "alpha_case": + sort_field = kwargs.get("sort_field", "name") + if sort_field == "name": f = lambda x: x.name else: f = lambda x: x[sort_field] entities = sorted(entities, key=f) - if kwargs.get('sort_dir', 'asc') == 'desc': + if kwargs.get("sort_dir", "asc") == "desc": entities = list(reversed(entities)) return entities @@ -2715,7 +2813,7 @@ def __iter__(self, **kwargs): yield item def iter(self, **kwargs): - """ Iterates over the collection of inputs. + """Iterates over the collection of inputs. :param kwargs: Additional arguments (optional): @@ -2739,7 +2837,7 @@ def iter(self, **kwargs): yield item def oneshot(self, path, **kwargs): - """ Creates a oneshot data input, which is an upload of a single file + """Creates a oneshot data input, which is an upload of a single file for one-time indexing. :param path: The path and filename. @@ -2748,7 +2846,7 @@ def oneshot(self, path, **kwargs): available parameters, see `Input parameters `_ on Splunk Developer Portal. :type kwargs: ``dict`` """ - self.post('oneshot', name=path, **kwargs) + self.post("oneshot", name=path, **kwargs) class Job(Entity): @@ -2815,7 +2913,7 @@ def events(self, **kwargs): :return: The ``InputStream`` IO handle to this job's events. """ - kwargs['segmentation'] = kwargs.get('segmentation', 'none') + kwargs["segmentation"] = kwargs.get("segmentation", "none") # Search API v1(GET) and v2(POST) if self.service.disable_v2_api: @@ -2838,7 +2936,7 @@ def is_done(self): """ if not self.is_ready(): return False - done = (self._state.content['isDone'] == '1') + done = self._state.content["isDone"] == "1" return done def is_ready(self): @@ -2852,7 +2950,7 @@ def is_ready(self): if response.status == 204: return False self._state = self.read(response) - ready = self._state.content['dispatchState'] not in ['QUEUED', 'PARSING'] + ready = self._state.content["dispatchState"] not in ["QUEUED", "PARSING"] return ready @property @@ -2907,7 +3005,7 @@ def results(self, **query_params): :return: The ``InputStream`` IO handle to this job's results. """ - query_params['segmentation'] = query_params.get('segmentation', 'none') + query_params["segmentation"] = query_params.get("segmentation", "none") # Search API v1(GET) and v2(POST) if self.service.disable_v2_api: @@ -2952,7 +3050,7 @@ def preview(self, **query_params): :return: The ``InputStream`` IO handle to this job's preview results. """ - query_params['segmentation'] = query_params.get('segmentation', 'none') + query_params["segmentation"] = query_params.get("segmentation", "none") # Search API v1(GET) and v2(POST) if self.service.disable_v2_api: @@ -2983,7 +3081,7 @@ def set_priority(self, value): :return: The :class:`Job`. """ - self.post('control', action="setpriority", priority=value) + self.post("control", action="setpriority", priority=value) return self def summary(self, **kwargs): @@ -3060,19 +3158,17 @@ def __init__(self, service): def _load_list(self, response): # Overridden because Job takes a sid instead of a path. entries = _load_atom_entries(response) - if entries is None: return [] + if entries is None: + return [] entities = [] for entry in entries: state = _parse_atom_entry(entry) - entity = self.item( - self.service, - entry['content']['sid'], - state=state) + entity = self.item(self.service, entry["content"]["sid"], state=state) entities.append(entity) return entities def create(self, query, **kwargs): - """ Creates a search using a search query and any additional parameters + """Creates a search using a search query and any additional parameters you provide. :param query: The search query. @@ -3086,7 +3182,9 @@ def create(self, query, **kwargs): :return: The :class:`Job`. """ if kwargs.get("exec_mode", None) == "oneshot": - raise TypeError("Cannot specify exec_mode=oneshot; use the oneshot method instead.") + raise TypeError( + "Cannot specify exec_mode=oneshot; use the oneshot method instead." + ) response = self.post(search=query, **kwargs) sid = _load_sid(response, kwargs.get("output_mode", None)) return Job(self.service, sid) @@ -3132,10 +3230,8 @@ def export(self, query, **params): """ if "exec_mode" in params: raise TypeError("Cannot specify an exec_mode to export.") - params['segmentation'] = params.get('segmentation', 'none') - return self.post(path_segment="export", - search=query, - **params).body + params["segmentation"] = params.get("segmentation", "none") + return self.post(path_segment="export", search=query, **params).body def itemmeta(self): """There is no metadata available for class:``Jobs``. @@ -3195,10 +3291,8 @@ def oneshot(self, query, **params): """ if "exec_mode" in params: raise TypeError("Cannot specify an exec_mode to oneshot.") - params['segmentation'] = params.get('segmentation', 'none') - return self.post(search=query, - exec_mode="oneshot", - **params).body + params["segmentation"] = params.get("segmentation", "none") + return self.post(search=query, exec_mode="oneshot", **params).body class Loggers(Collection): @@ -3238,15 +3332,15 @@ class ModularInputKind(Entity): """ def __contains__(self, name): - args = self.state.content['endpoints']['args'] + args = self.state.content["endpoints"]["args"] if name in args: return True return Entity.__contains__(self, name) def __getitem__(self, name): - args = self.state.content['endpoint']['args'] + args = self.state.content["endpoint"]["args"] if name in args: - return args['item'] + return args["item"] return Entity.__getitem__(self, name) @property @@ -3263,11 +3357,13 @@ def arguments(self): :return: A dictionary describing the arguments this modular input kind takes. :rtype: ``dict`` """ - return self.state.content['endpoint']['args'] + return self.state.content["endpoint"]["args"] def update(self, **kwargs): """Raises an error. Modular input kinds are read only.""" - raise IllegalOperationException("Modular input kinds cannot be updated via the REST API.") + raise IllegalOperationException( + "Modular input kinds cannot be updated via the REST API." + ) class SavedSearch(Entity): @@ -3292,7 +3388,7 @@ def alert_count(self): :return: The number of alerts fired by this saved search. :rtype: ``integer`` """ - return int(self._state.content.get('triggered_alert_count', 0)) + return int(self._state.content.get("triggered_alert_count", 0)) def dispatch(self, **kwargs): """Runs the saved search and returns the resulting search job. @@ -3318,15 +3414,20 @@ def fired_alerts(self): :return: A collection of fired alerts. :rtype: :class:`AlertGroup` """ - if self['is_scheduled'] == '0': - raise IllegalOperationException('Unscheduled saved searches have no alerts.') + if self["is_scheduled"] == "0": + raise IllegalOperationException( + "Unscheduled saved searches have no alerts." + ) c = Collection( self.service, - self.service._abspath(PATH_FIRED_ALERTS + self.name, - owner=self._state.access.owner, - app=self._state.access.app, - sharing=self._state.access.sharing), - item=AlertGroup) + self.service._abspath( + PATH_FIRED_ALERTS + self.name, + owner=self._state.access.owner, + app=self._state.access.app, + sharing=self._state.access.sharing, + ), + item=AlertGroup, + ) return c def history(self, **kwargs): @@ -3339,7 +3440,8 @@ def history(self, **kwargs): """ response = self.get("history", **kwargs) entries = _load_atom_entries(response) - if entries is None: return [] + if entries is None: + return [] jobs = [] for entry in entries: job = Job(self.service, entry.title) @@ -3363,11 +3465,12 @@ def update(self, search=None, **kwargs): # Updates to a saved search *require* that the search string be # passed, so we pass the current search string if a value wasn't # provided by the caller. - if search is None: search = self.content.search + if search is None: + search = self.content.search Entity.update(self, search=search, **kwargs) return self - def scheduled_times(self, earliest_time='now', latest_time='+1h'): + def scheduled_times(self, earliest_time="now", latest_time="+1h"): """Returns the times when this search is scheduled to run. By default this method returns the times in the next hour. For different @@ -3382,13 +3485,12 @@ def scheduled_times(self, earliest_time='now', latest_time='+1h'): :return: The list of search times. """ - response = self.get("scheduled_times", - earliest_time=earliest_time, - latest_time=latest_time) + response = self.get( + "scheduled_times", earliest_time=earliest_time, latest_time=latest_time + ) data = self._load_atom_entry(response) rec = _parse_atom_entry(data) - times = [datetime.fromtimestamp(int(t)) - for t in rec.content.scheduled_times] + times = [datetime.fromtimestamp(int(t)) for t in rec.content.scheduled_times] return times def suppress(self, expiration): @@ -3430,11 +3532,10 @@ class SavedSearches(Collection): collection using :meth:`Service.saved_searches`.""" def __init__(self, service): - Collection.__init__( - self, service, PATH_SAVED_SEARCHES, item=SavedSearch) + Collection.__init__(self, service, PATH_SAVED_SEARCHES, item=SavedSearch) def create(self, name, search, **kwargs): - """ Creates a saved search. + """Creates a saved search. :param name: The name for the saved search. :type name: ``string`` @@ -3452,6 +3553,7 @@ def create(self, name, search, **kwargs): class Macro(Entity): """This class represents a search macro.""" + def __init__(self, service, path, **kwargs): Entity.__init__(self, service, path, **kwargs) @@ -3461,7 +3563,7 @@ def args(self): :return: The macro arguments. :rtype: ``string`` """ - return self._state.content.get('args', '') + return self._state.content.get("args", "") @property def definition(self): @@ -3469,7 +3571,7 @@ def definition(self): :return: The macro definition. :rtype: ``string`` """ - return self._state.content.get('definition', '') + return self._state.content.get("definition", "") @property def errormsg(self): @@ -3477,7 +3579,7 @@ def errormsg(self): :return: The validation error message for the macro. :rtype: ``string`` """ - return self._state.content.get('errormsg', '') + return self._state.content.get("errormsg", "") @property def iseval(self): @@ -3485,7 +3587,7 @@ def iseval(self): :return: The iseval value for the macro. :rtype: ``string`` """ - return self._state.content.get('iseval', '0') + return self._state.content.get("iseval", "0") def update(self, definition=None, **kwargs): """Updates the server with any changes you've made to the current macro @@ -3500,7 +3602,8 @@ def update(self, definition=None, **kwargs): # Updates to a macro *require* that the definition be # passed, so we pass the current definition if a value wasn't # provided by the caller. - if definition is None: definition = self.content.definition + if definition is None: + definition = self.content.definition Entity.update(self, definition=definition, **kwargs) return self @@ -3510,18 +3613,18 @@ def validation(self): :return: The validation expression for the macro. :rtype: ``string`` """ - return self._state.content.get('validation', '') + return self._state.content.get("validation", "") class Macros(Collection): """This class represents a collection of macros. Retrieve this collection using :meth:`Service.macros`.""" + def __init__(self, service): - Collection.__init__( - self, service, PATH_MACROS, item=Macro) + Collection.__init__(self, service, PATH_MACROS, item=Macro) def create(self, name, definition, **kwargs): - """ Creates a macro. + """Creates a macro. :param name: The name for the macro. :type name: ``string`` :param definition: The macro definition. @@ -3557,8 +3660,7 @@ def update(self, **kwargs): class User(Entity): - """This class represents a Splunk user. - """ + """This class represents a Splunk user.""" @property def role_entities(self): @@ -3568,7 +3670,11 @@ def role_entities(self): :rtype: ``list`` """ all_role_names = [r.name for r in self.service.roles.list()] - return [self.service.roles[name] for name in self.content.roles if name in all_role_names] + return [ + self.service.roles[name] + for name in self.content.roles + if name in all_role_names + ] # Splunk automatically lowercases new user names so we need to match that @@ -3627,13 +3733,12 @@ def create(self, username, password, roles, **params): entry = _load_atom(response, XNAME_ENTRY).entry state = _parse_atom_entry(entry) entity = self.item( - self.service, - parse.unquote(state.links.alternate), - state=state) + self.service, parse.unquote(state.links.alternate), state=state + ) return entity def delete(self, name): - """ Deletes the user and returns the resulting collection of users. + """Deletes the user and returns the resulting collection of users. :param name: The name of the user to delete. :type name: ``string`` @@ -3645,8 +3750,7 @@ def delete(self, name): class Role(Entity): - """This class represents a user role. - """ + """This class represents a user role.""" def grant(self, *capabilities_to_grant): """Grants additional capabilities to this role. @@ -3668,7 +3772,7 @@ def grant(self, *capabilities_to_grant): for capability in capabilities_to_grant: if capability not in possible_capabilities: raise NoSuchCapability(capability) - new_capabilities = self['capabilities'] + list(capabilities_to_grant) + new_capabilities = self["capabilities"] + list(capabilities_to_grant) self.post(capabilities=new_capabilities) return self @@ -3693,13 +3797,13 @@ def revoke(self, *capabilities_to_revoke): for capability in capabilities_to_revoke: if capability not in possible_capabilities: raise NoSuchCapability(capability) - old_capabilities = self['capabilities'] + old_capabilities = self["capabilities"] new_capabilities = [] for c in old_capabilities: if c not in capabilities_to_revoke: new_capabilities.append(c) if not new_capabilities: - new_capabilities = '' # Empty lists don't get passed in the body, so we have to force an empty argument. + new_capabilities = "" # Empty lists don't get passed in the body, so we have to force an empty argument. self.post(capabilities=new_capabilities) return self @@ -3752,13 +3856,12 @@ def create(self, name, **params): entry = _load_atom(response, XNAME_ENTRY).entry state = _parse_atom_entry(entry) entity = self.item( - self.service, - parse.unquote(state.links.alternate), - state=state) + self.service, parse.unquote(state.links.alternate), state=state + ) return entity def delete(self, name): - """ Deletes the role and returns the resulting collection of roles. + """Deletes the role and returns the resulting collection of roles. :param name: The name of the role to delete. :type name: ``string`` @@ -3777,10 +3880,10 @@ def setupInfo(self): :return: The setup information. """ - return self.content.get('eai:setup', None) + return self.content.get("eai:setup", None) def package(self): - """ Creates a compressed package of the app for archiving.""" + """Creates a compressed package of the app for archiving.""" return self._run_action("package") def updateInfo(self): @@ -3790,7 +3893,9 @@ def updateInfo(self): class KVStoreCollections(Collection): def __init__(self, service): - Collection.__init__(self, service, 'storage/collections/config', item=KVStoreCollection) + Collection.__init__( + self, service, "storage/collections/config", item=KVStoreCollection + ) def __getitem__(self, item): res = Collection.__getitem__(self, item) @@ -3816,9 +3921,9 @@ def create(self, name, accelerated_fields={}, fields={}, **kwargs): for k, v in accelerated_fields.items(): if isinstance(v, dict): v = json.dumps(v) - kwargs['accelerated_fields.' + k] = v + kwargs["accelerated_fields." + k] = v for k, v in fields.items(): - kwargs['field.' + k] = v + kwargs["field." + k] = v return self.post(name=name, **kwargs) @@ -3842,7 +3947,9 @@ def update_accelerated_field(self, name, value): :return: Result of POST request """ kwargs = {} - kwargs['accelerated_fields.' + name] = json.dumps(value) if isinstance(value, dict) else value + kwargs["accelerated_fields." + name] = ( + json.dumps(value) if isinstance(value, dict) else value + ) return self.post(**kwargs) def update_field(self, name, value): @@ -3856,7 +3963,7 @@ def update_field(self, name, value): :return: Result of POST request """ kwargs = {} - kwargs['field.' + name] = value + kwargs["field." + name] = value return self.post(**kwargs) @@ -3865,22 +3972,45 @@ class KVStoreCollectionData: Retrieve using :meth:`KVStoreCollection.data` """ - JSON_HEADER = [('Content-Type', 'application/json')] + + JSON_HEADER = [("Content-Type", "application/json")] def __init__(self, collection): self.service = collection.service self.collection = collection self.owner, self.app, self.sharing = collection._proper_namespace() - self.path = 'storage/collections/data/' + UrlEncoded(self.collection.name, encode_slash=True) + '/' + self.path = ( + "storage/collections/data/" + + UrlEncoded(self.collection.name, encode_slash=True) + + "/" + ) def _get(self, url, **kwargs): - return self.service.get(self.path + url, owner=self.owner, app=self.app, sharing=self.sharing, **kwargs) + return self.service.get( + self.path + url, + owner=self.owner, + app=self.app, + sharing=self.sharing, + **kwargs, + ) def _post(self, url, **kwargs): - return self.service.post(self.path + url, owner=self.owner, app=self.app, sharing=self.sharing, **kwargs) + return self.service.post( + self.path + url, + owner=self.owner, + app=self.app, + sharing=self.sharing, + **kwargs, + ) def _delete(self, url, **kwargs): - return self.service.delete(self.path + url, owner=self.owner, app=self.app, sharing=self.sharing, **kwargs) + return self.service.delete( + self.path + url, + owner=self.owner, + app=self.app, + sharing=self.sharing, + **kwargs, + ) def query(self, **query): """ @@ -3897,7 +4027,7 @@ def query(self, **query): if isinstance(query[key], dict): query[key] = json.dumps(value) - return json.loads(self._get('', **query).body.read().decode('utf-8')) + return json.loads(self._get("", **query).body.read().decode("utf-8")) def query_by_id(self, id): """ @@ -3909,7 +4039,11 @@ def query_by_id(self, id): :return: Document with id :rtype: ``dict`` """ - return json.loads(self._get(UrlEncoded(str(id), encode_slash=True)).body.read().decode('utf-8')) + return json.loads( + self._get(UrlEncoded(str(id), encode_slash=True)) + .body.read() + .decode("utf-8") + ) def insert(self, data): """ @@ -3924,7 +4058,10 @@ def insert(self, data): if isinstance(data, dict): data = json.dumps(data) return json.loads( - self._post('', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read().decode('utf-8')) + self._post("", headers=KVStoreCollectionData.JSON_HEADER, body=data) + .body.read() + .decode("utf-8") + ) def delete(self, query=None): """ @@ -3935,7 +4072,7 @@ def delete(self, query=None): :return: Result of DELETE request """ - return self._delete('', **({'query': query}) if query else {}) + return self._delete("", **({"query": query}) if query else {}) def delete_by_id(self, id): """ @@ -3962,8 +4099,15 @@ def update(self, id, data): """ if isinstance(data, dict): data = json.dumps(data) - return json.loads(self._post(UrlEncoded(str(id), encode_slash=True), headers=KVStoreCollectionData.JSON_HEADER, - body=data).body.read().decode('utf-8')) + return json.loads( + self._post( + UrlEncoded(str(id), encode_slash=True), + headers=KVStoreCollectionData.JSON_HEADER, + body=data, + ) + .body.read() + .decode("utf-8") + ) def batch_find(self, *dbqueries): """ @@ -3976,12 +4120,17 @@ def batch_find(self, *dbqueries): :rtype: ``array`` of ``array`` """ if len(dbqueries) < 1: - raise Exception('Must have at least one query.') + raise Exception("Must have at least one query.") data = json.dumps(dbqueries) return json.loads( - self._post('batch_find', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read().decode('utf-8')) + self._post( + "batch_find", headers=KVStoreCollectionData.JSON_HEADER, body=data + ) + .body.read() + .decode("utf-8") + ) def batch_save(self, *documents): """ @@ -3994,9 +4143,14 @@ def batch_save(self, *documents): :rtype: ``dict`` """ if len(documents) < 1: - raise Exception('Must have at least one document.') + raise Exception("Must have at least one document.") data = json.dumps(documents) return json.loads( - self._post('batch_save', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read().decode('utf-8')) \ No newline at end of file + self._post( + "batch_save", headers=KVStoreCollectionData.JSON_HEADER, body=data + ) + .body.read() + .decode("utf-8") + )