Skip to content

Commit a4a4fcf

Browse files
Merge pull request bryanyang0528#79 from harlev/query-stream
Support new query-stream API with HTTP/2
2 parents e74b650 + b0d6828 commit a4a4fcf

File tree

7 files changed

+218
-11
lines changed

7 files changed

+218
-11
lines changed

README.rst

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,49 @@ This command returns a generator. It can be printed e.g. by reading its values v
124124
{"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
125125
{"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}
126126

127+
Query with HTTP/2
128+
^^^^^^^^^^^^^^^^^
129+
Execute queries with the new ``/query-stream`` endpoint. Documented `here <https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#executing-pull-or-push-queries>`_
130+
131+
To execute a sql query use the same syntax as the regular query, with the additional ``use_http2=True`` parameter.
132+
133+
.. code:: python
134+
135+
client.query('select * from table1', use_http2=True)
136+
137+
A generator is returned with the following example response
138+
139+
::
140+
141+
{"queryId":"44d8413c-0018-423d-b58f-3f2064b9a312","columnNames":["ORDER_ID","TOTAL_AMOUNT","CUSTOMER_NAME"],"columnTypes":["INTEGER","DOUBLE","STRING"]}
142+
[3,43.0,"Palo Alto"]
143+
[3,43.0,"Palo Alto"]
144+
[3,43.0,"Palo Alto"]
145+
146+
To terminate the query above use the ``close_query`` call.
147+
Provide the ``queryId`` returned from the ``query`` call.
148+
149+
.. code:: python
150+
151+
client.close_query("44d8413c-0018-423d-b58f-3f2064b9a312")
152+
153+
Insert rows into a Stream with HTTP/2
154+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
155+
156+
Uses the new ``/inserts-stream`` endpoint. See `documentation <https://docs.ksqldb.io/en/0.10.0-ksqldb/developer-guide/ksqldb-rest-api/streaming-endpoint/#inserting-rows-into-an-existing-stream>`_
157+
158+
.. code:: python
159+
160+
rows = [
161+
{"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"},
162+
{"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"}
163+
]
164+
165+
results = self.api_client.inserts_stream("my_stream_name", rows)
166+
167+
An array of object will be returned on success, with the status of each row inserted.
168+
169+
127170
Simplified API
128171
~~~~~~~~~~~~~~
129172

ksql/api.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import urllib
99
from copy import deepcopy
1010
from requests import Timeout
11+
from urllib.parse import urlparse
12+
from hyper import HTTPConnection
13+
1114

1215
from ksql.builder import SQLBuilder
1316
from ksql.errors import CreateError, InvalidQueryError, KSQLError
@@ -65,6 +68,42 @@ def ksql(self, ksql_string, stream_properties=None):
6568
res = json.loads(response)
6669
return res
6770

71+
def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
72+
"""
73+
Process streaming incoming data with HTTP/2.
74+
75+
"""
76+
parsed_uri = urlparse(self.url)
77+
78+
logging.debug("KSQL generated: {}".format(query_string))
79+
sql_string = self._validate_sql_string(query_string)
80+
body = {"sql": sql_string}
81+
if stream_properties:
82+
body["properties"] = stream_properties
83+
else:
84+
body["properties"] = {}
85+
86+
with HTTPConnection(parsed_uri.netloc) as connection:
87+
streaming_response = self._request2(
88+
endpoint="query-stream", body=body, connection=connection
89+
)
90+
start_idle = None
91+
92+
if streaming_response.status == 200:
93+
for chunk in streaming_response.read_chunked():
94+
if chunk != b"\n":
95+
start_idle = None
96+
yield chunk.decode(encoding)
97+
98+
else:
99+
if not start_idle:
100+
start_idle = time.time()
101+
if idle_timeout and time.time() - start_idle > idle_timeout:
102+
print("Ending query because of time out! ({} seconds)".format(idle_timeout))
103+
return
104+
else:
105+
raise ValueError("Return code is {}.".format(streaming_response.status))
106+
68107
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
69108
"""
70109
Process streaming incoming data.
@@ -93,6 +132,20 @@ def get_request(self, endpoint):
93132
auth = (self.api_key, self.secret) if self.api_key or self.secret else None
94133
return requests.get(endpoint, headers=self.headers, auth=auth)
95134

135+
def _request2(self, endpoint, connection, body, method="POST", encoding="utf-8"):
136+
url = "{}/{}".format(self.url, endpoint)
137+
data = json.dumps(body).encode(encoding)
138+
139+
headers = deepcopy(self.headers)
140+
if self.api_key and self.secret:
141+
base64string = base64.b64encode(bytes("{}:{}".format(self.api_key, self.secret), "utf-8")).decode("utf-8")
142+
headers["Authorization"] = "Basic %s" % base64string
143+
144+
connection.request(method=method.upper(), url=url, headers=headers, body=data)
145+
resp = connection.get_response()
146+
147+
return resp
148+
96149
def _request(self, endpoint, method="POST", sql_string="", stream_properties=None, encoding="utf-8"):
97150
url = "{}/{}".format(self.url, endpoint)
98151

@@ -126,6 +179,47 @@ def _request(self, endpoint, method="POST", sql_string="", stream_properties=Non
126179
else:
127180
return r
128181

182+
def close_query(self, query_id):
183+
body = {"queryId": query_id}
184+
data = json.dumps(body).encode("utf-8")
185+
url = "{}/{}".format(self.url, "close-query")
186+
187+
response = requests.post(url=url, data=data)
188+
189+
if response.status_code == 200:
190+
logging.debug("Successfully canceled Query ID: {}".format(query_id))
191+
return True
192+
elif response.status_code == 400:
193+
message = json.loads(response.content)["message"]
194+
logging.debug("Failed canceling Query ID: {}: {}".format(query_id, message))
195+
return False
196+
else:
197+
raise ValueError("Return code is {}.".format(response.status_code))
198+
199+
def inserts_stream(self, stream_name, rows):
200+
body = '{{"target":"{}"}}'.format(stream_name)
201+
for row in rows:
202+
body += '\n{}'.format(json.dumps(row))
203+
204+
parsed_uri = urlparse(self.url)
205+
url = "{}/{}".format(self.url, "inserts-stream")
206+
headers = deepcopy(self.headers)
207+
with HTTPConnection(parsed_uri.netloc) as connection:
208+
connection.request("POST", url, bytes(body, "utf-8"), headers)
209+
response = connection.get_response()
210+
result = response.read()
211+
212+
result_str = result.decode("utf-8")
213+
result_chunks = result_str.split("\n")
214+
return_arr = []
215+
for chunk in result_chunks:
216+
try:
217+
return_arr.append(json.loads(chunk))
218+
except:
219+
pass
220+
221+
return return_arr
222+
129223
@staticmethod
130224
def retry(exceptions, delay=1, max_retries=5):
131225
"""

ksql/client.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,31 @@ def get_properties(self):
4242
def ksql(self, ksql_string, stream_properties=None):
4343
return self.sa.ksql(ksql_string, stream_properties=stream_properties)
4444

45-
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, return_objects=None):
46-
results = self.sa.query(
47-
query_string=query_string,
48-
encoding=encoding,
49-
chunk_size=chunk_size,
50-
stream_properties=stream_properties,
51-
idle_timeout=idle_timeout,
52-
)
45+
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, use_http2=None, return_objects=None):
46+
if use_http2:
47+
yield from self.sa.query2(
48+
query_string=query_string,
49+
encoding=encoding,
50+
chunk_size=chunk_size,
51+
stream_properties=stream_properties,
52+
idle_timeout=idle_timeout,
53+
)
54+
else:
55+
results = self.sa.query(
56+
query_string=query_string,
57+
encoding=encoding,
58+
chunk_size=chunk_size,
59+
stream_properties=stream_properties,
60+
idle_timeout=idle_timeout,
61+
)
62+
63+
yield from process_query_result(results, return_objects)
64+
65+
def close_query(self, query_id):
66+
return self.sa.close_query(query_id)
5367

54-
yield from process_query_result(results, return_objects)
68+
def inserts_stream(self, stream_name, rows):
69+
return self.sa.inserts_stream(stream_name, rows)
5570

5671
def create_stream(self, table_name, columns_type, topic, value_format="JSON"):
5772
return self.sa.create_stream(

requirements-dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,4 @@ wcwidth==0.2.5
4545
wrapt==1.12.1
4646
yarl==1.4.2
4747
zipp==3.1.0
48+
hyper

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
requests
22
six
33
urllib3
4+
hyper

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ def get_install_requirements(path):
3232
'install_requires': [
3333
'requests',
3434
'six',
35-
'urllib3'
35+
'urllib3',
36+
'hyper'
3637
],
3738
'zip_safe': False,
3839
}

tests/test_client.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import requests
22
import unittest
3+
import json
34
import vcr
45
from confluent_kafka import Producer
56

@@ -104,14 +105,65 @@ def test_ksql_create_stream_w_properties(self):
104105
producer = Producer({"bootstrap.servers": self.bootstrap_servers})
105106
producer.produce(self.exist_topic, """{"order_id":3,"total_amount":43,"customer_name":"Palo Alto"}""")
106107
producer.flush()
108+
109+
# test legacy HTTP/1.1 request
107110
chunks = self.api_client.query(
108111
"select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties
109112
)
110113

114+
header = next(chunks)
115+
self.assertEqual(header, """[{"header":{"queryId":"none","schema":"`ORDER_ID` INTEGER, `TOTAL_AMOUNT` DOUBLE, `CUSTOMER_NAME` STRING"}},\n""")
116+
111117
for chunk in chunks:
112-
self.assertTrue(chunk)
118+
self.assertEqual(chunk, """{"row":{"columns":[3,43.0,"Palo Alto"]}},\n""")
113119
break
114120

121+
# test new HTTP/2 request
122+
chunks = self.api_client.query(
123+
"select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties, use_http2=True
124+
)
125+
126+
header = next(chunks)
127+
header_obj = json.loads(header)
128+
self.assertEqual(header_obj["columnNames"], ['ORDER_ID', 'TOTAL_AMOUNT', 'CUSTOMER_NAME'])
129+
self.assertEqual(header_obj["columnTypes"], ['INTEGER', 'DOUBLE', 'STRING'])
130+
131+
for chunk in chunks:
132+
chunk_obj = json.loads(chunk)
133+
self.assertEqual(chunk_obj, [3,43.0, "Palo Alto"])
134+
break
135+
136+
@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support HTTP/2")
137+
def test_ksql_close_query(self):
138+
result = self.api_client.close_query("123")
139+
140+
self.assertFalse(result)
141+
142+
@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
143+
def test_inserts_stream(self):
144+
topic = self.exist_topic
145+
stream_name = "TEST_INSERTS_STREAM_STREAM"
146+
ksql_string = "CREATE STREAM {} (ORDER_ID INT, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \
147+
WITH (kafka_topic='{}', value_format='JSON');".format(
148+
stream_name, topic
149+
)
150+
151+
streamProperties = {"ksql.streams.auto.offset.reset": "earliest"}
152+
153+
if "TEST_KSQL_CREATE_STREAM" not in utils.get_all_streams(self.api_client):
154+
r = self.api_client.ksql(ksql_string, stream_properties=streamProperties)
155+
self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS")
156+
157+
rows = [
158+
{"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"},
159+
{"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"}
160+
]
161+
162+
results = self.api_client.inserts_stream(stream_name, rows)
163+
164+
for result in results:
165+
self.assertEqual(result["status"], "ok")
166+
115167
@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
116168
def test_ksql_parse_query_result_with_utils(self):
117169
topic = "TEST_KSQL_PARSE_QUERY_RESULT_WITH_UTILS_TOPIC"

0 commit comments

Comments
 (0)