Skip to content

Commit 1b64b58

Browse files
committed
Merge remote-tracking branch 'origin/master'
2 parents 9e704e0 + 7e107ae commit 1b64b58

10 files changed

+430
-13
lines changed

Dockerfile

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
FROM frolvlad/alpine-python3
2-
32
WORKDIR /app
4-
COPY . /app
3+
COPY *requirements* /app/
54
RUN sed -i -e 's/v3\.8/edge/g' /etc/apk/repositories \
65
&& apk upgrade --update-cache --available \
76
&& apk add --no-cache librdkafka librdkafka-dev
87
RUN apk add --no-cache alpine-sdk python3-dev
98
RUN pip install -r requirements.txt
109
RUN pip install -r test-requirements.txt
10+
COPY . /app
1111
RUN pip install -e .
12+

README.rst

+52
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@ Supported Python version: 3.5+
1212
.. image:: https://codecov.io/gh/bryanyang0528/ksql-python/branch/master/graph/badge.svg
1313
:target: https://codecov.io/gh/bryanyang0528/ksql-python
1414

15+
.. image:: https://pepy.tech/badge/ksql
16+
:target: https://pepy.tech/project/ksql
17+
18+
.. image:: https://pepy.tech/badge/ksql/month
19+
:target: https://pepy.tech/project/ksql/month
20+
21+
.. image:: https://img.shields.io/badge/license-MIT-yellow.svg
22+
:target: https://github.com/bryanyang0528/ksql-python/blob/master/LICENSE
23+
1524
Installation
1625
------------
1726

@@ -124,6 +133,49 @@ This command returns a generator. It can be printed e.g. by reading its values v
124133
{"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
125134
{"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}
126135

136+
Query with HTTP/2
137+
^^^^^^^^^^^^^^^^^
138+
Execute queries with the new ``/query-stream`` endpoint. Documented `here <https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#executing-pull-or-push-queries>`_
139+
140+
To execute a sql query use the same syntax as the regular query, with the additional ``use_http2=True`` parameter.
141+
142+
.. code:: python
143+
144+
client.query('select * from table1', use_http2=True)
145+
146+
A generator is returned with the following example response
147+
148+
::
149+
150+
{"queryId":"44d8413c-0018-423d-b58f-3f2064b9a312","columnNames":["ORDER_ID","TOTAL_AMOUNT","CUSTOMER_NAME"],"columnTypes":["INTEGER","DOUBLE","STRING"]}
151+
[3,43.0,"Palo Alto"]
152+
[3,43.0,"Palo Alto"]
153+
[3,43.0,"Palo Alto"]
154+
155+
To terminate the query above use the ``close_query`` call.
156+
Provide the ``queryId`` returned from the ``query`` call.
157+
158+
.. code:: python
159+
160+
client.close_query("44d8413c-0018-423d-b58f-3f2064b9a312")
161+
162+
Insert rows into a Stream with HTTP/2
163+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
164+
165+
Uses the new ``/inserts-stream`` endpoint. See `documentation <https://docs.ksqldb.io/en/0.10.0-ksqldb/developer-guide/ksqldb-rest-api/streaming-endpoint/#inserting-rows-into-an-existing-stream>`_
166+
167+
.. code:: python
168+
169+
rows = [
170+
{"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"},
171+
{"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"}
172+
]
173+
174+
results = self.api_client.inserts_stream("my_stream_name", rows)
175+
176+
An array of object will be returned on success, with the status of each row inserted.
177+
178+
127179
Simplified API
128180
~~~~~~~~~~~~~~
129181

ksql/api.py

+95-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import urllib
99
from copy import deepcopy
1010
from requests import Timeout
11+
from urllib.parse import urlparse
12+
from hyper import HTTPConnection
13+
1114

1215
from ksql.builder import SQLBuilder
1316
from ksql.errors import CreateError, InvalidQueryError, KSQLError
@@ -65,6 +68,42 @@ def ksql(self, ksql_string, stream_properties=None):
6568
res = json.loads(response)
6669
return res
6770

71+
def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
72+
"""
73+
Process streaming incoming data with HTTP/2.
74+
75+
"""
76+
parsed_uri = urlparse(self.url)
77+
78+
logging.debug("KSQL generated: {}".format(query_string))
79+
sql_string = self._validate_sql_string(query_string)
80+
body = {"sql": sql_string}
81+
if stream_properties:
82+
body["properties"] = stream_properties
83+
else:
84+
body["properties"] = {}
85+
86+
with HTTPConnection(parsed_uri.netloc) as connection:
87+
streaming_response = self._request2(
88+
endpoint="query-stream", body=body, connection=connection
89+
)
90+
start_idle = None
91+
92+
if streaming_response.status == 200:
93+
for chunk in streaming_response.read_chunked():
94+
if chunk != b"\n":
95+
start_idle = None
96+
yield chunk.decode(encoding)
97+
98+
else:
99+
if not start_idle:
100+
start_idle = time.time()
101+
if idle_timeout and time.time() - start_idle > idle_timeout:
102+
print("Ending query because of time out! ({} seconds)".format(idle_timeout))
103+
return
104+
else:
105+
raise ValueError("Return code is {}.".format(streaming_response.status))
106+
68107
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
69108
"""
70109
Process streaming incoming data.
@@ -93,6 +132,20 @@ def get_request(self, endpoint):
93132
auth = (self.api_key, self.secret) if self.api_key or self.secret else None
94133
return requests.get(endpoint, headers=self.headers, auth=auth)
95134

135+
def _request2(self, endpoint, connection, body, method="POST", encoding="utf-8"):
136+
url = "{}/{}".format(self.url, endpoint)
137+
data = json.dumps(body).encode(encoding)
138+
139+
headers = deepcopy(self.headers)
140+
if self.api_key and self.secret:
141+
base64string = base64.b64encode(bytes("{}:{}".format(self.api_key, self.secret), "utf-8")).decode("utf-8")
142+
headers["Authorization"] = "Basic %s" % base64string
143+
144+
connection.request(method=method.upper(), url=url, headers=headers, body=data)
145+
resp = connection.get_response()
146+
147+
return resp
148+
96149
def _request(self, endpoint, method="POST", sql_string="", stream_properties=None, encoding="utf-8"):
97150
url = "{}/{}".format(self.url, endpoint)
98151

@@ -122,10 +175,51 @@ def _request(self, endpoint, method="POST", sql_string="", stream_properties=Non
122175
raise http_error
123176
else:
124177
logging.debug("content: {}".format(content))
125-
raise KSQLError(e=content.get("message"), error_code=content.get("error_code"))
178+
raise KSQLError(content.get("message"), content.get("error_code"), content.get("stackTrace"))
126179
else:
127180
return r
128181

182+
def close_query(self, query_id):
183+
body = {"queryId": query_id}
184+
data = json.dumps(body).encode("utf-8")
185+
url = "{}/{}".format(self.url, "close-query")
186+
187+
response = requests.post(url=url, data=data)
188+
189+
if response.status_code == 200:
190+
logging.debug("Successfully canceled Query ID: {}".format(query_id))
191+
return True
192+
elif response.status_code == 400:
193+
message = json.loads(response.content)["message"]
194+
logging.debug("Failed canceling Query ID: {}: {}".format(query_id, message))
195+
return False
196+
else:
197+
raise ValueError("Return code is {}.".format(response.status_code))
198+
199+
def inserts_stream(self, stream_name, rows):
200+
body = '{{"target":"{}"}}'.format(stream_name)
201+
for row in rows:
202+
body += '\n{}'.format(json.dumps(row))
203+
204+
parsed_uri = urlparse(self.url)
205+
url = "{}/{}".format(self.url, "inserts-stream")
206+
headers = deepcopy(self.headers)
207+
with HTTPConnection(parsed_uri.netloc) as connection:
208+
connection.request("POST", url, bytes(body, "utf-8"), headers)
209+
response = connection.get_response()
210+
result = response.read()
211+
212+
result_str = result.decode("utf-8")
213+
result_chunks = result_str.split("\n")
214+
return_arr = []
215+
for chunk in result_chunks:
216+
try:
217+
return_arr.append(json.loads(chunk))
218+
except:
219+
pass
220+
221+
return return_arr
222+
129223
@staticmethod
130224
def retry(exceptions, delay=1, max_retries=5):
131225
"""

ksql/client.py

+26-8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from __future__ import print_function
33

44
from ksql.api import SimplifiedAPI
5+
from ksql.utils import process_query_result
56

67

78
class KSQLAPI(object):
@@ -41,14 +42,31 @@ def get_properties(self):
4142
def ksql(self, ksql_string, stream_properties=None):
4243
return self.sa.ksql(ksql_string, stream_properties=stream_properties)
4344

44-
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
45-
return self.sa.query(
46-
query_string=query_string,
47-
encoding=encoding,
48-
chunk_size=chunk_size,
49-
stream_properties=stream_properties,
50-
idle_timeout=idle_timeout,
51-
)
45+
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, use_http2=None, return_objects=None):
46+
if use_http2:
47+
yield from self.sa.query2(
48+
query_string=query_string,
49+
encoding=encoding,
50+
chunk_size=chunk_size,
51+
stream_properties=stream_properties,
52+
idle_timeout=idle_timeout,
53+
)
54+
else:
55+
results = self.sa.query(
56+
query_string=query_string,
57+
encoding=encoding,
58+
chunk_size=chunk_size,
59+
stream_properties=stream_properties,
60+
idle_timeout=idle_timeout,
61+
)
62+
63+
yield from process_query_result(results, return_objects)
64+
65+
def close_query(self, query_id):
66+
return self.sa.close_query(query_id)
67+
68+
def inserts_stream(self, stream_name, rows):
69+
return self.sa.inserts_stream(stream_name, rows)
5270

5371
def create_stream(self, table_name, columns_type, topic, value_format="JSON"):
5472
return self.sa.create_stream(

ksql/utils.py

+44
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import ksql
22
import telnetlib
3+
import json
4+
import re
35

46

57
def check_kafka_available(bootstrap_servers):
@@ -65,3 +67,45 @@ def get_dependent_queries(api_client, stream_name):
6567
write_queries = [query["id"] for query in stream_info["writeQueries"]]
6668

6769
return read_queries, write_queries
70+
71+
72+
def parse_columns(columns_str):
73+
regex = r"(?<!\<)`(?P<name>[A-Z_]+)` (?P<type>[A-z]+)[\<, \"](?!\>)"
74+
result = []
75+
76+
matches = re.finditer(regex, columns_str)
77+
for matchNum, match in enumerate(matches, start=1):
78+
result.append({"name": match.group("name"), "type": match.group("type")})
79+
80+
return result
81+
82+
83+
def process_row(row, column_names):
84+
row = row.replace(",\n", "").replace("]\n", "")
85+
row_obj = json.loads(row)
86+
if 'finalMessage' in row_obj:
87+
return None
88+
column_values = row_obj["row"]["columns"]
89+
index = 0
90+
result = {}
91+
for column in column_values:
92+
result[column_names[index]["name"]] = column
93+
index += 1
94+
95+
return result
96+
97+
98+
def process_query_result(results, return_objects=None):
99+
if return_objects is None:
100+
yield from results
101+
102+
# parse rows into objects
103+
header = next(results)
104+
columns = parse_columns(header)
105+
106+
for result in results:
107+
row_obj = process_row(result, columns)
108+
if row_obj is None:
109+
return
110+
yield row_obj
111+

requirements-dev.txt

+1
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,4 @@ wcwidth==0.2.5
4545
wrapt==1.12.1
4646
yarl==1.4.2
4747
zipp==3.1.0
48+
hyper

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
requests
22
six
33
urllib3
4+
hyper

setup.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ def get_install_requirements(path):
3232
'install_requires': [
3333
'requests',
3434
'six',
35-
'urllib3'
35+
'urllib3',
36+
'hyper'
3637
],
3738
'zip_safe': False,
3839
}

0 commit comments

Comments
 (0)