From 9b08282668038373859b5269335f83fe9d86d995 Mon Sep 17 00:00:00 2001 From: Joris Geysens Date: Mon, 3 Feb 2020 10:01:34 +0100 Subject: [PATCH 1/7] Fix : uninitialized data using invalid protocol. - raise proper exception when send_packet is called with invalid protocol - fix missing 'optional' flag for query() parameters - fix shadowed parameters response and filter - add extra msgpack test starting from json with int, float and string values - default to old behaviour for client (don't use msgpack by default - see failing test) --- influxdb/client.py | 85 ++++++++----- influxdb/resultset.py | 4 +- influxdb/tests/client_test.py | 95 +++++++++++++- influxdb/tests/server_tests/base.py | 32 +++-- .../server_tests/client_test_with_server.py | 118 ++++++++++++++++-- 5 files changed, 281 insertions(+), 53 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 5e39f490..3c8bb14a 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -89,6 +89,7 @@ def __init__(self, pool_size=10, path='', cert=None, + use_msgpack=False ): """Construct a new InfluxDBClient object.""" self.__host = host @@ -110,7 +111,9 @@ def __init__(self, ) if use_udp: - self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + else: + self._udp_socket = None if not path: self.__path = '' @@ -145,10 +148,16 @@ def __init__(self, self._port, self._path) - self._headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/x-msgpack' - } + if use_msgpack: + self._headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/x-msgpack' + } + else: + self._headers = { + 'Content-Type': 'application/json', + 'Accept': 'text/plain' + } @property def _baseurl(self): @@ -243,14 +252,14 @@ def request(self, url, method='GET', params=None, data=None, :param method: the HTTP method for the request, defaults to GET :type method: str :param params: additional parameters for the request, defaults to None - :type params: dict + :type params: dict, optional :param data: the data of the request, defaults to None - :type data: str + :type data: str, optional :param expected_response_code: the expected response code of the request, defaults to 200 :type expected_response_code: int :param headers: headers to add to the request - :type headers: dict + :type headers: dict, optional :returns: the response from the request :rtype: :class:`requests.Response` :raises InfluxDBServerError: if the response code is any server error @@ -285,6 +294,7 @@ def request(self, url, method='GET', params=None, data=None, verify=self._verify_ssl, timeout=self._timeout ) + response._msgpack = None break except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, @@ -297,30 +307,39 @@ def request(self, url, method='GET', params=None, data=None, if not retry: raise - type_header = response.headers and response.headers.get("Content-Type") - if type_header == "application/x-msgpack" and response.content: - response._msgpack = msgpack.unpackb( - packed=response.content, - ext_hook=_msgpack_parse_hook, - raw=False) - else: - response._msgpack = None + if self._is_msg_pack_response(response): + if response.content: + response._msgpack = msgpack.unpackb( + packed=response.content, + ext_hook=_msgpack_parse_hook, + raw=False) - def reformat_error(response): - if response._msgpack: - return json.dumps(response._msgpack, separators=(',', ':')) - else: - return response.content - - # if there's not an error, there must have been a successful response - if 500 <= response.status_code < 600: - raise InfluxDBServerError(reformat_error(response)) - elif response.status_code == expected_response_code: + if response.status_code == expected_response_code: return response else: - err_msg = reformat_error(response) + err_msg = self._reformat_msgpack_error(response) raise InfluxDBClientError(err_msg, response.status_code) + @staticmethod + def _is_msg_pack_response(response): + if response is None: + return False + + if response.headers is None: + return False + + if "Content-Type" not in response.headers: + return False + + content_type = response.headers["Content-Type"] + return content_type == "application/x-msgpack" + + def _reformat_msgpack_error(self, _response): + if _response._msgpack is not None: + return json.dumps(_response._msgpack, separators=(',', ':')) + else: + return _response.content + def write(self, data, params=None, expected_response_code=204, protocol='json'): """Write data to InfluxDB. @@ -1071,7 +1090,7 @@ def drop_continuous_query(self, name, database=None): self.query(query_string) def send_packet(self, packet, protocol='json', time_precision=None): - """Send an UDP packet. + """Send an UDP packet. Only valid when use_udp is True. :param packet: the packet to be sent :type packet: (if protocol is 'json') dict @@ -1081,11 +1100,19 @@ def send_packet(self, packet, protocol='json', time_precision=None): :param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None :type time_precision: str """ + + if not self._use_udp: + raise RuntimeError("Unable to send packet : use_udp set to False") + if protocol == 'json': data = make_lines(packet, time_precision).encode('utf-8') elif protocol == 'line': data = ('\n'.join(packet) + '\n').encode('utf-8') - self.udp_socket.sendto(data, (self._host, self._udp_port)) + else: + raise InfluxDBClientError("Invalid protocol name : " + "expected json or line") + + self._udp_socket.sendto(data, (self._host, self._udp_port)) def close(self): """Close http session.""" diff --git a/influxdb/resultset.py b/influxdb/resultset.py index ba4f3c13..c2010ce6 100644 --- a/influxdb/resultset.py +++ b/influxdb/resultset.py @@ -129,9 +129,9 @@ def __iter__(self): yield list(self.__getitem__(key)) @staticmethod - def _tag_matches(tags, filter): + def _tag_matches(tags, _filter): """Check if all key/values in filter match in tags.""" - for tag_name, tag_value in filter.items(): + for tag_name, tag_value in _filter.items(): # using _sentinel as I'm not sure that "None" # could be used, because it could be a valid # series_tags value : when a series has no such tag diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 54116f7e..a6ab2605 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -26,6 +26,7 @@ import json import mock +import msgpack import requests import requests.exceptions import requests_mock @@ -33,6 +34,7 @@ from nose.tools import raises from influxdb import InfluxDBClient +from influxdb.exceptions import InfluxDBClientError from influxdb.resultset import ResultSet @@ -77,12 +79,17 @@ def request(*args, **kwargs): class TestInfluxDBClient(unittest.TestCase): """Set up the TestInfluxDBClient object.""" + @staticmethod + def _create_new_client(use_msgpack=False): + return InfluxDBClient('localhost', 8086, 'username', 'password', + use_msgpack=use_msgpack) + def setUp(self): """Initialize an instance of TestInfluxDBClient object.""" # By default, raise exceptions on warnings warnings.simplefilter('error', FutureWarning) - self.cli = InfluxDBClient('localhost', 8086, 'username', 'password') + self.cli = self._create_new_client() self.dummy_points = [ { "measurement": "cpu_load_short", @@ -496,6 +503,78 @@ def test_query_msgpack(self): [{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}] ) + def test_query_msgpack_extended(self): + """Test query method with a msgpack response.""" + + example_response_obj = { + "results": [ + { + "series": [ + { + "measurement": "sdfsdfsdf", + "columns": [ + "time", + "value_float", + "value_int", + "value_string" + ], + "values": [ + [ + "2009-11-10T23:00:00Z", + 0.64, + 2, + "some value" + ] + ] + } + ] + }, + { + "series": [ + { + "measurement": "cpu_load_short", + "columns": [ + "time", + "value" + ], + "values": [ + [ + "2020-01-10T23:00:00Z", + -0.645468546312 + ] + ] + } + ] + } + ] + } + + example_response_packed = msgpack.packb(example_response_obj) + + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + request_headers={"Accept": "application/x-msgpack"}, + headers={"Content-Type": "application/x-msgpack"}, + content=example_response_packed + ) + + rs = self.cli.query('select * from a') + + self.assertListEqual( + list(rs[0].get_points()), + [{'value_float': 0.64, + 'value_int': 2, + 'value_string': "some value", + 'time': '2009-11-10T23:00:00Z'}] + ) + + self.assertListEqual( + list(rs[1].get_points()), + [{'value': -0.645468546312, 'time': '2020-01-10T23:00:00Z'}] + ) + def test_select_into_post(self): """Test SELECT.*INTO is POSTed.""" example_response = ( @@ -1278,6 +1357,20 @@ def test_chunked_response(self): 'columns': ['fieldKey', 'fieldType']}]} ).__repr__()) + def test_wrong_protocol(self): + """ Test invalid protocol for sending packet """ + client = InfluxDBClient('localhost', 8086, 'username', 'password', + use_udp=True) + with self.assertRaises(InfluxDBClientError): + client.send_packet(packet="", protocol="Json") + + def test_udp_flag(self): + """ Test invalid protocol for sending packet """ + client = InfluxDBClient('localhost', 8086, 'username', 'password', + use_udp=False) + with self.assertRaises(RuntimeError): + client.send_packet(packet="", protocol="json") + class FakeClient(InfluxDBClient): """Set up a fake client instance of InfluxDBClient.""" diff --git a/influxdb/tests/server_tests/base.py b/influxdb/tests/server_tests/base.py index fe722870..50f7e88b 100644 --- a/influxdb/tests/server_tests/base.py +++ b/influxdb/tests/server_tests/base.py @@ -17,23 +17,35 @@ from influxdb.dataframe_client import DataFrameClient +def _create_new_client(host, port, username, password, database, + use_msgpack=False): + return InfluxDBClient(host, port, username, password, database=database, + use_msgpack=use_msgpack) + + +def _create_new_dataframe_client(host, port, username, password, database, + use_msgpack=False): + return DataFrameClient(host, port, username, password, database=database, + use_msgpack=use_msgpack) + + def _setup_influxdb_server(inst): inst.influxd_inst = InfluxDbInstance( inst.influxdb_template_conf, udp_enabled=getattr(inst, 'influxdb_udp_enabled', False), ) - inst.cli = InfluxDBClient('localhost', - inst.influxd_inst.http_port, - 'root', - '', - database='db') + inst.cli = _create_new_client('localhost', + inst.influxd_inst.http_port, + 'root', + '', + database='db') if not using_pypy: - inst.cliDF = DataFrameClient('localhost', - inst.influxd_inst.http_port, - 'root', - '', - database='db') + inst.cliDF = _create_new_dataframe_client('localhost', + inst.influxd_inst.http_port, + 'root', + '', + database='db') def _teardown_influxdb_server(inst): diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index fda3f720..fcb55897 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -34,7 +34,6 @@ import pandas as pd from pandas.util.testing import assert_frame_equal - THIS_DIR = os.path.abspath(os.path.dirname(__file__)) @@ -61,7 +60,8 @@ def point(series_name, timestamp=None, tags=None, **fields): }, "time": "2009-11-10T23:00:00Z", "fields": { - "value": 0.64 + "value": 0.64, + "another_value": 2 } } ] @@ -76,7 +76,58 @@ def point(series_name, timestamp=None, tags=None, **fields): }, "time": "2009-11-10T23:01:35Z", "fields": { - "value": 33.0 + "value": 33.0, + "another_value": 7 + } + } +] + +mixed_series_dummy_points_part_1 = [ + { + "measurement": "cpu_load_short_mixed", + "tags": { + "host": "server01", + "region": "us-west" + }, + "time": "2009-11-10T23:00:00Z", + "fields": { + "value": 0.64 + } + }, + { + "measurement": "cpu_load_short_mixed", + "tags": { + "host": "server01", + "region": "us-west" + }, + "time": "2009-11-10T23:01:00Z", + "fields": { + "value": 0.65 + } + }, + { + "measurement": "cpu_load_short_mixed", + "tags": { + "host": "server01", + "region": "us-west" + }, + "time": "2009-11-10T23:02:00Z", + "fields": { + "value": 0.35 + } + } +] + +mixed_series_dummy_points_part_2 = [ + { + "measurement": "cpu_load_short_mixed", + "tags": { + "host": "server01", + "region": "us-west" + }, + "time": "2009-11-10T23:03:00Z", + "fields": { + "value": 1.0 } } ] @@ -87,25 +138,24 @@ def point(series_name, timestamp=None, tags=None, **fields): "tags": {"host": "server01", "region": "us-west"}, "dataframe": pd.DataFrame( - [[0.64]], columns=['value'], + [[0.64, 2]], columns=['value', 'another_value'], index=pd.to_datetime(["2009-11-10T23:00:00Z"])) } dummy_points_df = [{ "measurement": "cpu_load_short", "tags": {"host": "server01", "region": "us-west"}, "dataframe": pd.DataFrame( - [[0.64]], columns=['value'], + [[0.64, 2]], columns=['value', 'another_value'], index=pd.to_datetime(["2009-11-10T23:00:00Z"])), }, { "measurement": "memory", "tags": {"host": "server01", "region": "us-west"}, "dataframe": pd.DataFrame( - [[33]], columns=['value'], + [[33, 7]], columns=['value', 'another_value'], index=pd.to_datetime(["2009-11-10T23:01:35Z"]) ) }] - dummy_point_without_timestamp = [ { "measurement": "cpu_load_short", @@ -114,7 +164,8 @@ def point(series_name, timestamp=None, tags=None, **fields): "region": "us-west" }, "fields": { - "value": 0.64 + "value": 0.64, + "another_value": 2 } } ] @@ -285,12 +336,14 @@ def test_write_check_read(self): self.test_write() time.sleep(1) rsp = self.cli.query('SELECT * FROM cpu_load_short', database='db') - self.assertListEqual([{'value': 0.64, 'time': '2009-11-10T23:00:00Z', + self.assertListEqual([{'value': 0.64, + 'another_value': 2, + 'time': '2009-11-10T23:00:00Z', "host": "server01", "region": "us-west"}], list(rsp.get_points())) def test_write_points(self): - """Test writing points to the server.""" + """ Test writing points to the server.""" self.assertIs(True, self.cli.write_points(dummy_point)) @skip_if_pypy @@ -305,6 +358,43 @@ def test_write_points_DF(self): ) ) + def test_write_points_mixed_type(self): + self.assertIs(True, self.cli.write_points(mixed_series_dummy_points_part_1)) + self.assertIs(True, self.cli.write_points(mixed_series_dummy_points_part_2)) + + client_1 = InfluxDBClient('localhost', self.influxd_inst.http_port, + 'root', '', database='db', use_msgpack=False) + client_2 = InfluxDBClient('localhost', self.influxd_inst.http_port, + 'root', '', database='db', use_msgpack=True) + + rsp1 = client_1.query('SELECT * FROM cpu_load_short_mixed') + rsp2 = client_2.query('SELECT * FROM cpu_load_short_mixed') + + for res in [rsp1, rsp2]: + self.assertEqual( + list(res), + [[ + {'value': 0.64, + 'time': '2009-11-10T23:00:00Z', + "host": "server01", + "region": "us-west"}, + {'value': 0.65, + 'time': '2009-11-10T23:01:00Z', + "host": "server01", + "region": "us-west"}, + {'value': 0.35, + 'time': '2009-11-10T23:02:00Z', + "host": "server01", + "region": "us-west"}, + {'value': 1.0, + 'time': '2009-11-10T23:03:00Z', + "host": "server01", + "region": "us-west"} + ]] + ) + + self.assertEqual(rsp1, rsp2) + def test_write_points_check_read(self): """Test writing points and check read back.""" self.test_write_points() @@ -315,6 +405,7 @@ def test_write_points_check_read(self): list(rsp), [[ {'value': 0.64, + 'another_value': 2, 'time': '2009-11-10T23:00:00Z', "host": "server01", "region": "us-west"} @@ -329,6 +420,7 @@ def test_write_points_check_read(self): pt, {'time': '2009-11-10T23:00:00Z', 'value': 0.64, + 'another_value': 2, "host": "server01", "region": "us-west"} ) @@ -364,6 +456,7 @@ def test_write_multiple_points_different_series(self): self.assertEqual( [[ {'value': 0.64, + 'another_value': 2, 'time': '2009-11-10T23:00:00Z', "host": "server01", "region": "us-west"} @@ -377,6 +470,7 @@ def test_write_multiple_points_different_series(self): rsp, [[ {'value': 33, + 'another_value': 7, 'time': '2009-11-10T23:01:35Z', "host": "server01", "region": "us-west"} @@ -387,7 +481,7 @@ def test_select_into_as_post(self): """Test SELECT INTO is POSTed.""" self.assertIs(True, self.cli.write_points(dummy_points)) time.sleep(1) - rsp = self.cli.query('SELECT * INTO "newmeas" FROM "memory"') + _ = self.cli.query('SELECT * INTO "newmeas" FROM "memory"') rsp = self.cli.query('SELECT * FROM "newmeas"') lrsp = list(rsp) @@ -395,6 +489,7 @@ def test_select_into_as_post(self): lrsp, [[ {'value': 33, + 'another_value': 7, 'time': '2009-11-10T23:01:35Z', "host": "server01", "region": "us-west"} @@ -849,6 +944,7 @@ def test_write_points_udp(self): # this is dummy_points : [ {'value': 0.64, + 'another_value': 2, 'time': '2009-11-10T23:00:00Z', "host": "server01", "region": "us-west"} From 4bbe67bd334cd218299f5258182837cfaea39e48 Mon Sep 17 00:00:00 2001 From: Joris Geysens Date: Tue, 4 Feb 2020 13:07:40 +0100 Subject: [PATCH 2/7] Unit test both with and without msgpack. Retention policy replication is int. - retention policy creation : cast to int, adjust docs - client_test : some tests using @raises(Exception) were succeeding for the wrong reason (assertion in mock function): using self.assertRaises() instead. - test all functions with msgpack enabled and disabled (2 clients) --- influxdb/client.py | 6 +- influxdb/tests/client_test.py | 766 ++++++++++-------- .../server_tests/client_test_with_server.py | 9 +- 3 files changed, 415 insertions(+), 366 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 3c8bb14a..f4132b9a 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -22,7 +22,6 @@ from influxdb.line_protocol import make_lines, quote_ident, quote_literal from influxdb.resultset import ResultSet from .exceptions import InfluxDBClientError -from .exceptions import InfluxDBServerError class InfluxDBClient(object): @@ -716,7 +715,7 @@ def create_retention_policy(self, name, duration, replication, The minimum retention period is 1 hour. :type duration: str :param replication: the replication of the retention policy - :type replication: str + :type replication: int :param database: the database for which the retention policy is created. Defaults to current client's database :type database: str @@ -736,7 +735,7 @@ def create_retention_policy(self, name, duration, replication, "CREATE RETENTION POLICY {0} ON {1} " \ "DURATION {2} REPLICATION {3} SHARD DURATION {4}".format( quote_ident(name), quote_ident(database or self._database), - duration, replication, shard_duration) + duration, int(replication), shard_duration) if default is True: query_string += " DEFAULT" @@ -1100,7 +1099,6 @@ def send_packet(self, packet, protocol='json', time_precision=None): :param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None :type time_precision: str """ - if not self._use_udp: raise RuntimeError("Unable to send packet : use_udp set to False") diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index a6ab2605..5afe9388 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -89,7 +89,10 @@ def setUp(self): # By default, raise exceptions on warnings warnings.simplefilter('error', FutureWarning) - self.cli = self._create_new_client() + self.clients = [ + self._create_new_client(use_msgpack=False), + self._create_new_client(use_msgpack=True) + ] self.dummy_points = [ { "measurement": "cpu_load_short", @@ -162,7 +165,7 @@ def test_cert(self): self.assertEqual(cli._session.cert, '/etc/pki/tls/private/dummy.crt') with self.assertRaises(ValueError): - cli = InfluxDBClient(cert='/etc/pki/tls/private/dummy.crt') + _ = InfluxDBClient(cert='/etc/pki/tls/private/dummy.crt') def test_switch_database(self): """Test switch database in TestInfluxDBClient object.""" @@ -230,18 +233,18 @@ def test_write_points_toplevel_attributes(self): status_code=204 ) - cli = InfluxDBClient(database='db') - cli.write_points( - self.dummy_points, - database='testdb', - tags={"tag": "hello"}, - retention_policy="somepolicy" - ) - self.assertEqual( - 'cpu_load_short,host=server01,region=us-west,tag=hello ' - 'value=0.64 1257894000123456000\n', - m.last_request.body.decode('utf-8'), - ) + for client in self.clients: + client.write_points( + self.dummy_points, + database='testdb', + tags={"tag": "hello"}, + retention_policy="somepolicy" + ) + self.assertEqual( + 'cpu_load_short,host=server01,region=us-west,tag=hello ' + 'value=0.64 1257894000123456000\n', + m.last_request.body.decode('utf-8'), + ) def test_write_points_batch(self): """Test write points batch for TestInfluxDBClient object.""" @@ -258,19 +261,20 @@ def test_write_points_batch(self): "value=12.0 1257894000000000000\n" ) - with requests_mock.Mocker() as m: - m.register_uri(requests_mock.POST, - "http://localhost:8086/write", - status_code=204) - cli = InfluxDBClient(database='db') - cli.write_points(points=dummy_points, - database='db', - tags={"host": "server01", - "region": "us-west"}, - batch_size=2) - self.assertEqual(m.call_count, 2) - self.assertEqual(expected_last_body, - m.last_request.body.decode('utf-8')) + for client in self.clients: + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + client.write_points(points=dummy_points, + database='db', + tags={"host": "server01", + "region": "us-west"}, + batch_size=2) + self.assertEqual(m.call_count, 2) + self.assertEqual(expected_last_body, + m.last_request.body.decode('utf-8')) def test_write_points_udp(self): """Test write points UDP for TestInfluxDBClient object.""" @@ -292,12 +296,16 @@ def test_write_points_udp(self): received_data.decode() ) + # TODO : test fails for the wrong reasons (assertion in mock) @raises(Exception) def test_write_points_fails(self): """Test write points fail for TestInfluxDBClient object.""" cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') with _mocked_session(cli, 'post', 500): - cli.write_points([]) + cli.write_points(points=self.dummy_points, + database='db', + tags={"host": "server01", + "region": "us-west"}) def test_write_points_with_precision(self): """Test write points with precision for TestInfluxDBClient object.""" @@ -430,32 +438,32 @@ def test_write_points_with_precision_udp(self): def test_write_points_bad_precision(self): """Test write points w/bad precision TestInfluxDBClient object.""" - cli = InfluxDBClient() - with self.assertRaisesRegexp( - Exception, - "Invalid time precision is given. " - "\(use 'n', 'u', 'ms', 's', 'm' or 'h'\)" - ): - cli.write_points( - self.dummy_points, - time_precision='g' - ) + for client in self.clients: + with self.assertRaisesRegexp( + Exception, + "Invalid time precision is given. " + "\(use 'n', 'u', 'ms', 's', 'm' or 'h'\)" + ): + client.write_points( + self.dummy_points, + time_precision='g' + ) def test_write_points_bad_consistency(self): """Test write points w/bad consistency value.""" - cli = InfluxDBClient() - with self.assertRaises(ValueError): - cli.write_points( - self.dummy_points, - consistency='boo' - ) + for client in self.clients: + with self.assertRaises(ValueError): + client.write_points( + self.dummy_points, + consistency='boo' + ) - @raises(Exception) def test_write_points_with_precision_fails(self): """Test write points w/precision fail for TestInfluxDBClient object.""" - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - with _mocked_session(cli, 'post', 500): - cli.write_points_with_precision([]) + for client in self.clients: + with _mocked_session(client, 'post', 500): + with self.assertRaises(AttributeError): + client.write_points_with_precision([]) def test_query(self): """Test query method for TestInfluxDBClient object.""" @@ -473,12 +481,13 @@ def test_query(self): "http://localhost:8086/query", text=example_response ) - rs = self.cli.query('select * from foo') + for client in self.clients: + rs = client.query('select * from foo') - self.assertListEqual( - list(rs[0].get_points()), - [{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}] - ) + self.assertListEqual( + list(rs[0].get_points()), + [{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}] + ) def test_query_msgpack(self): """Test query method with a messagepack response.""" @@ -496,7 +505,8 @@ def test_query_msgpack(self): headers={"Content-Type": "application/x-msgpack"}, content=example_response ) - rs = self.cli.query('select * from a') + client = self._create_new_client(use_msgpack=True) + rs = client.query('select * from a') self.assertListEqual( list(rs.get_points()), @@ -504,50 +514,49 @@ def test_query_msgpack(self): ) def test_query_msgpack_extended(self): - """Test query method with a msgpack response.""" - + """Test query method with a msgpack response - various types.""" example_response_obj = { - "results": [ + "results": [ { - "series": [ - { - "measurement": "sdfsdfsdf", - "columns": [ - "time", - "value_float", - "value_int", - "value_string" - ], - "values": [ - [ - "2009-11-10T23:00:00Z", - 0.64, - 2, - "some value" - ] - ] - } - ] + "series": [ + { + "measurement": "sdfsdfsdf", + "columns": [ + "time", + "value_float", + "value_int", + "value_string" + ], + "values": [ + [ + "2009-11-10T23:00:00Z", + 0.64, + 2, + "some value" + ] + ] + } + ] }, { - "series": [ - { - "measurement": "cpu_load_short", - "columns": [ - "time", - "value" - ], - "values": [ - [ - "2020-01-10T23:00:00Z", - -0.645468546312 - ] - ] - } - ] + "series": [ + { + "measurement": "cpu_load_short", + "columns": [ + "time", + "value" + ], + "values": [ + [ + "2020-01-10T23:00:00Z", + -0.645468546312 + ] + ] + } + ] } - ] - } + ] + } example_response_packed = msgpack.packb(example_response_obj) @@ -560,7 +569,8 @@ def test_query_msgpack_extended(self): content=example_response_packed ) - rs = self.cli.query('select * from a') + client = self._create_new_client(use_msgpack=True) + rs = client.query('select * from a') self.assertListEqual( list(rs[0].get_points()), @@ -591,12 +601,14 @@ def test_select_into_post(self): "http://localhost:8086/query", text=example_response ) - rs = self.cli.query('select * INTO newmeas from foo') - self.assertListEqual( - list(rs[0].get_points()), - [{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}] - ) + for client in self.clients: + rs = client.query('select * INTO newmeas from foo') + + self.assertListEqual( + list(rs[0].get_points()), + [{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}] + ) @unittest.skip('Not implemented for 0.9') def test_query_chunked(self): @@ -632,11 +644,12 @@ def test_query_chunked(self): [example_object, example_object] ) - @raises(Exception) def test_query_fail(self): """Test query failed for TestInfluxDBClient object.""" - with _mocked_session(self.cli, 'get', 401): - self.cli.query('select column_one from foo;') + for client in self.clients: + with _mocked_session(client, 'get', 401): + with self.assertRaises(InfluxDBClientError): + client.query('select column_one from foo;') def test_ping(self): """Test ping querying InfluxDB version.""" @@ -647,8 +660,9 @@ def test_ping(self): status_code=204, headers={'X-Influxdb-Version': '1.2.3'} ) - version = self.cli.ping() - self.assertEqual(version, '1.2.3') + for client in self.clients: + version = client.ping() + self.assertEqual(version, '1.2.3') def test_create_database(self): """Test create database for TestInfluxDBClient object.""" @@ -658,11 +672,12 @@ def test_create_database(self): "http://localhost:8086/query", text='{"results":[{}]}' ) - self.cli.create_database('new_db') - self.assertEqual( - m.last_request.qs['q'][0], - 'create database "new_db"' - ) + for client in self.clients: + client.create_database('new_db') + self.assertEqual( + m.last_request.qs['q'][0], + 'create database "new_db"' + ) def test_create_numeric_named_database(self): """Test create db w/numeric name for TestInfluxDBClient object.""" @@ -672,17 +687,19 @@ def test_create_numeric_named_database(self): "http://localhost:8086/query", text='{"results":[{}]}' ) - self.cli.create_database('123') - self.assertEqual( - m.last_request.qs['q'][0], - 'create database "123"' - ) + for client in self.clients: + client.create_database('123') + self.assertEqual( + m.last_request.qs['q'][0], + 'create database "123"' + ) - @raises(Exception) def test_create_database_fails(self): """Test create database fail for TestInfluxDBClient object.""" - with _mocked_session(self.cli, 'post', 401): - self.cli.create_database('new_db') + for client in self.clients: + with _mocked_session(client, 'post', 401): + with self.assertRaises(InfluxDBClientError): + client.create_database('new_db') def test_drop_database(self): """Test drop database for TestInfluxDBClient object.""" @@ -692,11 +709,12 @@ def test_drop_database(self): "http://localhost:8086/query", text='{"results":[{}]}' ) - self.cli.drop_database('new_db') - self.assertEqual( - m.last_request.qs['q'][0], - 'drop database "new_db"' - ) + for client in self.clients: + client.drop_database('new_db') + self.assertEqual( + m.last_request.qs['q'][0], + 'drop database "new_db"' + ) def test_drop_measurement(self): """Test drop measurement for TestInfluxDBClient object.""" @@ -706,11 +724,12 @@ def test_drop_measurement(self): "http://localhost:8086/query", text='{"results":[{}]}' ) - self.cli.drop_measurement('new_measurement') - self.assertEqual( - m.last_request.qs['q'][0], - 'drop measurement "new_measurement"' - ) + for client in self.clients: + client.drop_measurement('new_measurement') + self.assertEqual( + m.last_request.qs['q'][0], + 'drop measurement "new_measurement"' + ) def test_drop_numeric_named_database(self): """Test drop numeric db for TestInfluxDBClient object.""" @@ -720,11 +739,12 @@ def test_drop_numeric_named_database(self): "http://localhost:8086/query", text='{"results":[{}]}' ) - self.cli.drop_database('123') - self.assertEqual( - m.last_request.qs['q'][0], - 'drop database "123"' - ) + for client in self.clients: + client.drop_database('123') + self.assertEqual( + m.last_request.qs['q'][0], + 'drop database "123"' + ) def test_get_list_database(self): """Test get list of databases for TestInfluxDBClient object.""" @@ -737,18 +757,19 @@ def test_get_list_database(self): 'columns': ['name']}]} ]} - with _mocked_session(self.cli, 'get', 200, json.dumps(data)): - self.assertListEqual( - self.cli.get_list_database(), - [{'name': 'new_db_1'}, {'name': 'new_db_2'}] - ) + for client in self.clients: + with _mocked_session(client, 'get', 200, json.dumps(data)): + self.assertListEqual( + client.get_list_database(), + [{'name': 'new_db_1'}, {'name': 'new_db_2'}] + ) - @raises(Exception) def test_get_list_database_fails(self): """Test get list of dbs fail for TestInfluxDBClient object.""" - cli = InfluxDBClient('host', 8086, 'username', 'password') - with _mocked_session(cli, 'get', 401): - cli.get_list_database() + for client in self.clients: + with _mocked_session(client, 'get', 401): + with self.assertRaises(InfluxDBClientError): + client.get_list_database() def test_get_list_measurements(self): """Test get list of measurements for TestInfluxDBClient object.""" @@ -762,11 +783,12 @@ def test_get_list_measurements(self): ] } - with _mocked_session(self.cli, 'get', 200, json.dumps(data)): - self.assertListEqual( - self.cli.get_list_measurements(), - [{'name': 'cpu'}, {'name': 'disk'}] - ) + for client in self.clients: + with _mocked_session(client, 'get', 200, json.dumps(data)): + self.assertListEqual( + client.get_list_measurements(), + [{'name': 'cpu'}, {'name': 'disk'}] + ) def test_create_retention_policy_default(self): """Test create default ret policy for TestInfluxDBClient object.""" @@ -778,15 +800,16 @@ def test_create_retention_policy_default(self): "http://localhost:8086/query", text=example_response ) - self.cli.create_retention_policy( - 'somename', '1d', 4, default=True, database='db' - ) + for client in self.clients: + client.create_retention_policy( + 'somename', '1d', 4, default=True, database='db' + ) - self.assertEqual( - m.last_request.qs['q'][0], - 'create retention policy "somename" on ' - '"db" duration 1d replication 4 shard duration 0s default' - ) + self.assertEqual( + m.last_request.qs['q'][0], + 'create retention policy "somename" on ' + '"db" duration 1d replication 4 shard duration 0s default' + ) def test_create_retention_policy(self): """Test create retention policy for TestInfluxDBClient object.""" @@ -798,15 +821,16 @@ def test_create_retention_policy(self): "http://localhost:8086/query", text=example_response ) - self.cli.create_retention_policy( - 'somename', '1d', 4, database='db' - ) + for client in self.clients: + client.create_retention_policy( + 'somename', '1d', 4, database='db' + ) - self.assertEqual( - m.last_request.qs['q'][0], - 'create retention policy "somename" on ' - '"db" duration 1d replication 4 shard duration 0s' - ) + self.assertEqual( + m.last_request.qs['q'][0], + 'create retention policy "somename" on ' + '"db" duration 1d replication 4 shard duration 0s' + ) def test_create_retention_policy_shard_duration(self): """Test create retention policy with a custom shard duration.""" @@ -818,16 +842,17 @@ def test_create_retention_policy_shard_duration(self): "http://localhost:8086/query", text=example_response ) - self.cli.create_retention_policy( - 'somename2', '1d', 4, database='db', - shard_duration='1h' - ) + for client in self.clients: + client.create_retention_policy( + 'somename2', '1d', 4, database='db', + shard_duration='1h' + ) - self.assertEqual( - m.last_request.qs['q'][0], - 'create retention policy "somename2" on ' - '"db" duration 1d replication 4 shard duration 1h' - ) + self.assertEqual( + m.last_request.qs['q'][0], + 'create retention policy "somename2" on ' + '"db" duration 1d replication 4 shard duration 1h' + ) def test_create_retention_policy_shard_duration_default(self): """Test create retention policy with a default shard duration.""" @@ -839,17 +864,18 @@ def test_create_retention_policy_shard_duration_default(self): "http://localhost:8086/query", text=example_response ) - self.cli.create_retention_policy( - 'somename3', '1d', 4, database='db', - shard_duration='1h', default=True - ) + for client in self.clients: + client.create_retention_policy( + 'somename3', '1d', 4, database='db', + shard_duration='1h', default=True + ) - self.assertEqual( - m.last_request.qs['q'][0], - 'create retention policy "somename3" on ' - '"db" duration 1d replication 4 shard duration 1h ' - 'default' - ) + self.assertEqual( + m.last_request.qs['q'][0], + 'create retention policy "somename3" on ' + '"db" duration 1d replication 4 shard duration 1h ' + 'default' + ) def test_alter_retention_policy(self): """Test alter retention policy for TestInfluxDBClient object.""" @@ -861,43 +887,42 @@ def test_alter_retention_policy(self): "http://localhost:8086/query", text=example_response ) - # Test alter duration - self.cli.alter_retention_policy('somename', 'db', - duration='4d') - self.assertEqual( - m.last_request.qs['q'][0], - 'alter retention policy "somename" on "db" duration 4d' - ) - # Test alter replication - self.cli.alter_retention_policy('somename', 'db', - replication=4) - self.assertEqual( - m.last_request.qs['q'][0], - 'alter retention policy "somename" on "db" replication 4' - ) - - # Test alter shard duration - self.cli.alter_retention_policy('somename', 'db', - shard_duration='1h') - self.assertEqual( - m.last_request.qs['q'][0], - 'alter retention policy "somename" on "db" shard duration 1h' - ) + for client in self.clients: + # Test alter duration + client.alter_retention_policy('somename', 'db', duration='4d') + self.assertEqual( + m.last_request.qs['q'][0], + 'alter retention policy "somename" on "db" duration 4d' + ) + # Test alter replication + client.alter_retention_policy('somename', 'db', replication=4) + self.assertEqual( + m.last_request.qs['q'][0], + 'alter retention policy "somename" on "db" replication 4' + ) + + # Test alter shard duration + client.alter_retention_policy('somename', 'db', + shard_duration='1h') + self.assertEqual( + m.last_request.qs['q'][0], + 'alter retention policy "somename" ' + 'on "db" shard duration 1h' + ) + + # Test alter default + client.alter_retention_policy('somename', 'db', default=True) + self.assertEqual( + m.last_request.qs['q'][0], + 'alter retention policy "somename" on "db" default' + ) - # Test alter default - self.cli.alter_retention_policy('somename', 'db', - default=True) - self.assertEqual( - m.last_request.qs['q'][0], - 'alter retention policy "somename" on "db" default' - ) - - @raises(Exception) def test_alter_retention_policy_invalid(self): """Test invalid alter ret policy for TestInfluxDBClient object.""" - cli = InfluxDBClient('host', 8086, 'username', 'password') - with _mocked_session(cli, 'get', 400): - self.cli.alter_retention_policy('somename', 'db') + for client in self.clients: + with _mocked_session(client, 'post', 400): + with self.assertRaises(InfluxDBClientError): + client.alter_retention_policy('somename', 'db') def test_drop_retention_policy(self): """Test drop retention policy for TestInfluxDBClient object.""" @@ -909,23 +934,25 @@ def test_drop_retention_policy(self): "http://localhost:8086/query", text=example_response ) - self.cli.drop_retention_policy('somename', 'db') - self.assertEqual( - m.last_request.qs['q'][0], - 'drop retention policy "somename" on "db"' - ) + for client in self.clients: + client.drop_retention_policy('somename', 'db') + self.assertEqual( + m.last_request.qs['q'][0], + 'drop retention policy "somename" on "db"' + ) - @raises(Exception) def test_drop_retention_policy_fails(self): """Test failed drop ret policy for TestInfluxDBClient object.""" - cli = InfluxDBClient('host', 8086, 'username', 'password') - with _mocked_session(cli, 'delete', 401): - cli.drop_retention_policy('default', 'db') + for client in self.clients: + with _mocked_session(client, 'post', 401): + with self.assertRaises(InfluxDBClientError): + client.drop_retention_policy('default', 'db') def test_get_list_retention_policies(self): """Test get retention policies for TestInfluxDBClient object.""" example_response = \ - '{"results": [{"series": [{"values": [["fsfdsdf", "24h0m0s", 2]],'\ + '{"results": ' \ + '[{"series": [{"values": [["fsfdsdf", "24h0m0s", 2]],' \ ' "columns": ["name", "duration", "replicaN"]}]}]}' with requests_mock.Mocker() as m: @@ -934,15 +961,17 @@ def test_get_list_retention_policies(self): "http://localhost:8086/query", text=example_response ) - self.assertListEqual( - self.cli.get_list_retention_policies("db"), - [{'duration': '24h0m0s', - 'name': 'fsfdsdf', 'replicaN': 2}] - ) + for client in self.clients: + self.assertListEqual( + client.get_list_retention_policies("db"), + [{'duration': '24h0m0s', + 'name': 'fsfdsdf', 'replicaN': 2}] + ) @mock.patch('requests.Session.request') def test_request_retry(self, mock_request): """Test that two connection errors will be handled.""" + class CustomMock(object): """Create custom mock object for test.""" @@ -962,14 +991,15 @@ def connection_error(self, *args, **kwargs): mock_request.side_effect = CustomMock().connection_error - cli = InfluxDBClient(database='db') - cli.write_points( - self.dummy_points - ) + for client in self.clients: + client.write_points( + self.dummy_points + ) @mock.patch('requests.Session.request') def test_request_retry_raises(self, mock_request): """Test that three requests errors will not be handled.""" + class CustomMock(object): """Create custom mock object for test.""" @@ -987,22 +1017,21 @@ def connection_error(self, *args, **kwargs): r.status_code = 200 return r - mock_request.side_effect = CustomMock().connection_error - - cli = InfluxDBClient(database='db') - - with self.assertRaises(requests.exceptions.HTTPError): - cli.write_points(self.dummy_points) + for client in self.clients: + mock_request.side_effect = CustomMock().connection_error + with self.assertRaises(requests.exceptions.HTTPError): + client.write_points(self.dummy_points) @mock.patch('requests.Session.request') def test_random_request_retry(self, mock_request): """Test that a random number of connection errors will be handled.""" + class CustomMock(object): """Create custom mock object for test.""" - def __init__(self, retries): + def __init__(self, _retries): self.i = 0 - self.retries = retries + self.retries = _retries def connection_error(self, *args, **kwargs): """Handle a connection error for the CustomMock object.""" @@ -1024,6 +1053,7 @@ def connection_error(self, *args, **kwargs): @mock.patch('requests.Session.request') def test_random_request_retry_raises(self, mock_request): """Test a random number of conn errors plus one will not be handled.""" + class CustomMock(object): """Create custom mock object for test.""" @@ -1064,10 +1094,11 @@ def test_get_list_users(self): text=example_response ) - self.assertListEqual( - self.cli.get_list_users(), - [{'user': 'test', 'admin': False}] - ) + for client in self.clients: + self.assertListEqual( + client.get_list_users(), + [{'user': 'test', 'admin': False}] + ) def test_get_list_users_empty(self): """Test get empty userlist for TestInfluxDBClient object.""" @@ -1080,8 +1111,8 @@ def test_get_list_users_empty(self): "http://localhost:8086/query", text=example_response ) - - self.assertListEqual(self.cli.get_list_users(), []) + for client in self.clients: + self.assertListEqual(client.get_list_users(), []) def test_grant_admin_privileges(self): """Test grant admin privs for TestInfluxDBClient object.""" @@ -1093,19 +1124,21 @@ def test_grant_admin_privileges(self): "http://localhost:8086/query", text=example_response ) - self.cli.grant_admin_privileges('test') + for client in self.clients: + client.grant_admin_privileges('test') - self.assertEqual( - m.last_request.qs['q'][0], - 'grant all privileges to "test"' - ) + self.assertEqual( + m.last_request.qs['q'][0], + 'grant all privileges to "test"' + ) - @raises(Exception) def test_grant_admin_privileges_invalid(self): """Test grant invalid admin privs for TestInfluxDBClient object.""" cli = InfluxDBClient('host', 8086, 'username', 'password') with _mocked_session(cli, 'get', 400): - self.cli.grant_admin_privileges('') + for client in self.clients: + with self.assertRaises(InfluxDBClientError): + client.grant_admin_privileges('') def test_revoke_admin_privileges(self): """Test revoke admin privs for TestInfluxDBClient object.""" @@ -1117,19 +1150,21 @@ def test_revoke_admin_privileges(self): "http://localhost:8086/query", text=example_response ) - self.cli.revoke_admin_privileges('test') + for client in self.clients: + client.revoke_admin_privileges('test') - self.assertEqual( - m.last_request.qs['q'][0], - 'revoke all privileges from "test"' - ) + self.assertEqual( + m.last_request.qs['q'][0], + 'revoke all privileges from "test"' + ) - @raises(Exception) def test_revoke_admin_privileges_invalid(self): """Test revoke invalid admin privs for TestInfluxDBClient object.""" cli = InfluxDBClient('host', 8086, 'username', 'password') with _mocked_session(cli, 'get', 400): - self.cli.revoke_admin_privileges('') + for client in self.clients: + with self.assertRaises(InfluxDBClientError): + client.revoke_admin_privileges('') def test_grant_privilege(self): """Test grant privs for TestInfluxDBClient object.""" @@ -1141,19 +1176,20 @@ def test_grant_privilege(self): "http://localhost:8086/query", text=example_response ) - self.cli.grant_privilege('read', 'testdb', 'test') + for client in self.clients: + client.grant_privilege('read', 'testdb', 'test') - self.assertEqual( - m.last_request.qs['q'][0], - 'grant read on "testdb" to "test"' - ) + self.assertEqual( + m.last_request.qs['q'][0], + 'grant read on "testdb" to "test"' + ) - @raises(Exception) def test_grant_privilege_invalid(self): """Test grant invalid privs for TestInfluxDBClient object.""" - cli = InfluxDBClient('host', 8086, 'username', 'password') - with _mocked_session(cli, 'get', 400): - self.cli.grant_privilege('', 'testdb', 'test') + for client in self.clients: + with _mocked_session(client, 'post', 400): + with self.assertRaises(InfluxDBClientError): + client.grant_privilege('', 'testdb', 'test') def test_revoke_privilege(self): """Test revoke privs for TestInfluxDBClient object.""" @@ -1165,19 +1201,21 @@ def test_revoke_privilege(self): "http://localhost:8086/query", text=example_response ) - self.cli.revoke_privilege('read', 'testdb', 'test') - self.assertEqual( - m.last_request.qs['q'][0], - 'revoke read on "testdb" from "test"' - ) + for client in self.clients: + client.revoke_privilege('read', 'testdb', 'test') + + self.assertEqual( + m.last_request.qs['q'][0], + 'revoke read on "testdb" from "test"' + ) - @raises(Exception) def test_revoke_privilege_invalid(self): """Test revoke invalid privs for TestInfluxDBClient object.""" - cli = InfluxDBClient('host', 8086, 'username', 'password') - with _mocked_session(cli, 'get', 400): - self.cli.revoke_privilege('', 'testdb', 'test') + for client in self.clients: + with _mocked_session(client, 'post', 400): + with self.assertRaises(InfluxDBClientError): + client.revoke_privilege('', 'testdb', 'test') def test_get_list_privileges(self): """Tst get list of privs for TestInfluxDBClient object.""" @@ -1191,20 +1229,21 @@ def test_get_list_privileges(self): ]} ]} - with _mocked_session(self.cli, 'get', 200, json.dumps(data)): - self.assertListEqual( - self.cli.get_list_privileges('test'), - [{'database': 'db1', 'privilege': 'READ'}, - {'database': 'db2', 'privilege': 'ALL PRIVILEGES'}, - {'database': 'db3', 'privilege': 'NO PRIVILEGES'}] - ) + for client in self.clients: + with _mocked_session(client, 'get', 200, json.dumps(data)): + self.assertListEqual( + client.get_list_privileges('test'), + [{'database': 'db1', 'privilege': 'READ'}, + {'database': 'db2', 'privilege': 'ALL PRIVILEGES'}, + {'database': 'db3', 'privilege': 'NO PRIVILEGES'}] + ) - @raises(Exception) def test_get_list_privileges_fails(self): """Test failed get list of privs for TestInfluxDBClient object.""" - cli = InfluxDBClient('host', 8086, 'username', 'password') - with _mocked_session(cli, 'get', 401): - cli.get_list_privileges('test') + for client in self.clients: + with _mocked_session(client, 'get', 401): + with self.assertRaises(InfluxDBClientError): + client.get_list_privileges('test') def test_get_list_continuous_queries(self): """Test getting a list of continuous queries.""" @@ -1233,32 +1272,34 @@ def test_get_list_continuous_queries(self): ] } - with _mocked_session(self.cli, 'get', 200, json.dumps(data)): - self.assertListEqual( - self.cli.get_list_continuous_queries(), - [ - { - 'testdb01': [ - {'name': 'testname01', 'query': 'testquery01'}, - {'name': 'testname02', 'query': 'testquery02'} - ] - }, - { - 'testdb02': [ - {'name': 'testname03', 'query': 'testquery03'} - ] - }, - { - 'testdb03': [] - } - ] - ) + for client in self.clients: + with _mocked_session(client, 'get', 200, json.dumps(data)): + self.assertListEqual( + client.get_list_continuous_queries(), + [ + { + 'testdb01': [ + {'name': 'testname01', 'query': 'testquery01'}, + {'name': 'testname02', 'query': 'testquery02'} + ] + }, + { + 'testdb02': [ + {'name': 'testname03', 'query': 'testquery03'} + ] + }, + { + 'testdb03': [] + } + ] + ) - @raises(Exception) def test_get_list_continuous_queries_fails(self): """Test failing to get a list of continuous queries.""" - with _mocked_session(self.cli, 'get', 400): - self.cli.get_list_continuous_queries() + for client in self.clients: + with _mocked_session(client, 'get', 400): + with self.assertRaises(InfluxDBClientError): + client.get_list_continuous_queries() def test_create_continuous_query(self): """Test continuous query creation.""" @@ -1271,27 +1312,31 @@ def test_create_continuous_query(self): ) query = 'SELECT count("value") INTO "6_months"."events" FROM ' \ '"events" GROUP BY time(10m)' - self.cli.create_continuous_query('cq_name', query, 'db_name') - self.assertEqual( - m.last_request.qs['q'][0], - 'create continuous query "cq_name" on "db_name" begin select ' - 'count("value") into "6_months"."events" from "events" group ' - 'by time(10m) end' - ) - self.cli.create_continuous_query('cq_name', query, 'db_name', - 'EVERY 10s FOR 2m') - self.assertEqual( - m.last_request.qs['q'][0], - 'create continuous query "cq_name" on "db_name" resample ' - 'every 10s for 2m begin select count("value") into ' - '"6_months"."events" from "events" group by time(10m) end' - ) - @raises(Exception) + for client in self.clients: + client.create_continuous_query('cq_name', query, 'db_name') + self.assertEqual( + m.last_request.qs['q'][0], + 'create continuous query "cq_name" on "db_name" ' + 'begin select count("value") into "6_months"."events" ' + 'from "events" group by time(10m) end' + ) + client.create_continuous_query('cq_name', query, 'db_name', + 'EVERY 10s FOR 2m') + self.assertEqual( + m.last_request.qs['q'][0], + 'create continuous query "cq_name" on "db_name" resample ' + 'every 10s for 2m begin select count("value") into ' + '"6_months"."events" from "events" group by time(10m) end' + ) + def test_create_continuous_query_fails(self): """Test failing to create a continuous query.""" - with _mocked_session(self.cli, 'get', 400): - self.cli.create_continuous_query('cq_name', 'select', 'db_name') + for client in self.clients: + with _mocked_session(client, 'get', 400): + with self.assertRaises(InfluxDBClientError): + client.create_continuous_query('cq_name', 'select', + 'db_name') def test_drop_continuous_query(self): """Test dropping a continuous query.""" @@ -1302,17 +1347,19 @@ def test_drop_continuous_query(self): "http://localhost:8086/query", text=json.dumps(data) ) - self.cli.drop_continuous_query('cq_name', 'db_name') - self.assertEqual( - m.last_request.qs['q'][0], - 'drop continuous query "cq_name" on "db_name"' - ) + for client in self.clients: + client.drop_continuous_query('cq_name', 'db_name') + self.assertEqual( + m.last_request.qs['q'][0], + 'drop continuous query "cq_name" on "db_name"' + ) - @raises(Exception) def test_drop_continuous_query_fails(self): """Test failing to drop a continuous query.""" - with _mocked_session(self.cli, 'get', 400): - self.cli.drop_continuous_query('cq_name', 'db_name') + for client in self.clients: + with _mocked_session(client, 'get', 400): + with self.assertRaises(InfluxDBClientError): + client.drop_continuous_query('cq_name', 'db_name') def test_invalid_port_fails(self): """Test invalid port fail for TestInfluxDBClient object.""" @@ -1339,33 +1386,34 @@ def test_chunked_response(self): "http://localhost:8086/query", text=example_response ) - response = self.cli.query('show series limit 4 offset 0', - chunked=True, chunk_size=4) - self.assertTrue(len(response) == 4) - self.assertEqual(response.__repr__(), ResultSet( - {'series': [{'values': [['value', 'integer']], - 'name': 'cpu', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'iops', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'load', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'memory', - 'columns': ['fieldKey', 'fieldType']}]} - ).__repr__()) + for client in self.clients: + response = client.query('show series limit 4 offset 0', + chunked=True, chunk_size=4) + self.assertTrue(len(response) == 4) + self.assertEqual(response.__repr__(), ResultSet( + {'series': [{'values': [['value', 'integer']], + 'name': 'cpu', + 'columns': ['fieldKey', 'fieldType']}, + {'values': [['value', 'integer']], + 'name': 'iops', + 'columns': ['fieldKey', 'fieldType']}, + {'values': [['value', 'integer']], + 'name': 'load', + 'columns': ['fieldKey', 'fieldType']}, + {'values': [['value', 'integer']], + 'name': 'memory', + 'columns': ['fieldKey', 'fieldType']}]} + ).__repr__()) def test_wrong_protocol(self): - """ Test invalid protocol for sending packet """ + """Test invalid protocol for sending packet.""" client = InfluxDBClient('localhost', 8086, 'username', 'password', use_udp=True) with self.assertRaises(InfluxDBClientError): client.send_packet(packet="", protocol="Json") def test_udp_flag(self): - """ Test invalid protocol for sending packet """ + """Test invalid protocol for sending packet.""" client = InfluxDBClient('localhost', 8086, 'username', 'password', use_udp=False) with self.assertRaises(RuntimeError): diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index fcb55897..40d5b03e 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -343,7 +343,7 @@ def test_write_check_read(self): list(rsp.get_points())) def test_write_points(self): - """ Test writing points to the server.""" + """Test writing points to the server.""" self.assertIs(True, self.cli.write_points(dummy_point)) @skip_if_pypy @@ -359,8 +359,11 @@ def test_write_points_DF(self): ) def test_write_points_mixed_type(self): - self.assertIs(True, self.cli.write_points(mixed_series_dummy_points_part_1)) - self.assertIs(True, self.cli.write_points(mixed_series_dummy_points_part_2)) + """Test writing points with mixed type and reading them in again.""" + self.assertIs(True, + self.cli.write_points(mixed_series_dummy_points_part_1)) + self.assertIs(True, + self.cli.write_points(mixed_series_dummy_points_part_2)) client_1 = InfluxDBClient('localhost', self.influxd_inst.http_port, 'root', '', database='db', use_msgpack=False) From 088da3cf78c66b38119fbb8042439489533d9ffa Mon Sep 17 00:00:00 2001 From: Joris Geysens Date: Tue, 4 Feb 2020 14:22:51 +0100 Subject: [PATCH 3/7] Fix client tests that are failing. --- influxdb/tests/client_test.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 5afe9388..af3701f9 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -1134,9 +1134,8 @@ def test_grant_admin_privileges(self): def test_grant_admin_privileges_invalid(self): """Test grant invalid admin privs for TestInfluxDBClient object.""" - cli = InfluxDBClient('host', 8086, 'username', 'password') - with _mocked_session(cli, 'get', 400): - for client in self.clients: + for client in self.clients: + with _mocked_session(client, 'post', 400): with self.assertRaises(InfluxDBClientError): client.grant_admin_privileges('') @@ -1160,9 +1159,8 @@ def test_revoke_admin_privileges(self): def test_revoke_admin_privileges_invalid(self): """Test revoke invalid admin privs for TestInfluxDBClient object.""" - cli = InfluxDBClient('host', 8086, 'username', 'password') - with _mocked_session(cli, 'get', 400): - for client in self.clients: + for client in self.clients: + with _mocked_session(client, 'post', 400): with self.assertRaises(InfluxDBClientError): client.revoke_admin_privileges('') From 065ed44451b619c395cbc811877a209e4c907c10 Mon Sep 17 00:00:00 2001 From: Joris Geysens Date: Fri, 21 Feb 2020 15:24:12 +0100 Subject: [PATCH 4/7] Make msgpack default option, fix demonstrator test. --- influxdb/client.py | 2 +- influxdb/tests/server_tests/client_test_with_server.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index f4132b9a..24e52ea6 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -88,7 +88,7 @@ def __init__(self, pool_size=10, path='', cert=None, - use_msgpack=False + use_msgpack=True ): """Construct a new InfluxDBClient object.""" self.__host = host diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index 40d5b03e..333cc56c 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -374,7 +374,7 @@ def test_write_points_mixed_type(self): rsp2 = client_2.query('SELECT * FROM cpu_load_short_mixed') for res in [rsp1, rsp2]: - self.assertEqual( + self.assertNotEqual( list(res), [[ {'value': 0.64, From 9a8b66dc112942c24f6a13110d0e778f54c432a0 Mon Sep 17 00:00:00 2001 From: Joris Geysens Date: Mon, 24 Feb 2020 08:49:49 +0100 Subject: [PATCH 5/7] Fix server test test_write_points_mixed_type --- .../server_tests/client_test_with_server.py | 65 ++++++++++++------- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index 333cc56c..093e3684 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -373,30 +373,49 @@ def test_write_points_mixed_type(self): rsp1 = client_1.query('SELECT * FROM cpu_load_short_mixed') rsp2 = client_2.query('SELECT * FROM cpu_load_short_mixed') - for res in [rsp1, rsp2]: - self.assertNotEqual( - list(res), - [[ - {'value': 0.64, - 'time': '2009-11-10T23:00:00Z', - "host": "server01", - "region": "us-west"}, - {'value': 0.65, - 'time': '2009-11-10T23:01:00Z', - "host": "server01", - "region": "us-west"}, - {'value': 0.35, - 'time': '2009-11-10T23:02:00Z', - "host": "server01", - "region": "us-west"}, - {'value': 1.0, - 'time': '2009-11-10T23:03:00Z', - "host": "server01", - "region": "us-west"} - ]] - ) + self.assertEqual( + list(rsp1), + [[ + {'value': 0.64, + 'time': '2009-11-10T23:00:00Z', + "host": "server01", + "region": "us-west"}, + {'value': 0.65, + 'time': '2009-11-10T23:01:00Z', + "host": "server01", + "region": "us-west"}, + {'value': 0.35, + 'time': '2009-11-10T23:02:00Z', + "host": "server01", + "region": "us-west"}, + {'value': 1, + 'time': '2009-11-10T23:03:00Z', + "host": "server01", + "region": "us-west"} + ]] + ) - self.assertEqual(rsp1, rsp2) + self.assertEqual( + list(rsp2), + [[ + {'value': 0.64, + 'time': '2009-11-10T23:00:00Z', + "host": "server01", + "region": "us-west"}, + {'value': 0.65, + 'time': '2009-11-10T23:01:00Z', + "host": "server01", + "region": "us-west"}, + {'value': 0.35, + 'time': '2009-11-10T23:02:00Z', + "host": "server01", + "region": "us-west"}, + {'value': 1.0, + 'time': '2009-11-10T23:03:00Z', + "host": "server01", + "region": "us-west"} + ]] + ) def test_write_points_check_read(self): """Test writing points and check read back.""" From 51d0213c601e9298b00a26e793df0e2ae1c38ba9 Mon Sep 17 00:00:00 2001 From: Joris Geysens Date: Mon, 24 Feb 2020 11:49:12 +0100 Subject: [PATCH 6/7] Remove check for JSON result in server test test_write_points_mixed_type --- .../server_tests/client_test_with_server.py | 27 +------------------ 1 file changed, 1 insertion(+), 26 deletions(-) diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index 093e3684..fbdd068e 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -365,36 +365,11 @@ def test_write_points_mixed_type(self): self.assertIs(True, self.cli.write_points(mixed_series_dummy_points_part_2)) - client_1 = InfluxDBClient('localhost', self.influxd_inst.http_port, - 'root', '', database='db', use_msgpack=False) client_2 = InfluxDBClient('localhost', self.influxd_inst.http_port, - 'root', '', database='db', use_msgpack=True) + 'root', '', database='db') - rsp1 = client_1.query('SELECT * FROM cpu_load_short_mixed') rsp2 = client_2.query('SELECT * FROM cpu_load_short_mixed') - self.assertEqual( - list(rsp1), - [[ - {'value': 0.64, - 'time': '2009-11-10T23:00:00Z', - "host": "server01", - "region": "us-west"}, - {'value': 0.65, - 'time': '2009-11-10T23:01:00Z', - "host": "server01", - "region": "us-west"}, - {'value': 0.35, - 'time': '2009-11-10T23:02:00Z', - "host": "server01", - "region": "us-west"}, - {'value': 1, - 'time': '2009-11-10T23:03:00Z', - "host": "server01", - "region": "us-west"} - ]] - ) - self.assertEqual( list(rsp2), [[ From 44a26eb0cdcf5fdddc8cfb13d01eec2f489ff99b Mon Sep 17 00:00:00 2001 From: Joris Geysens Date: Mon, 24 Feb 2020 15:59:57 +0100 Subject: [PATCH 7/7] Add use_msgpack docstring. --- influxdb/client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/influxdb/client.py b/influxdb/client.py index 24e52ea6..2f15e9af 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -68,6 +68,10 @@ class InfluxDBClient(object): as a single file containing the private key and the certificate, or as a tuple of both files’ paths, defaults to None :type cert: str + :param use_msgpack: A bool indicating to use msgpack to retrieve query + results from InfluxDB. If False, the fallback will be JSON. This flag + sets the Accept header of the request. Defaults to True + :type use_msgpack: bool :raises ValueError: if cert is provided but ssl is disabled (set to False) """