Skip to content

Commit 7e9b418

Browse files
Merge pull request bryanyang0528#67 from bryanyang0528/fixed_unittests
Fixed unittests
2 parents e39a9ca + 5424728 commit 7e9b418

40 files changed

+2479
-524
lines changed

Diff for: ksql/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11

22
__package_name__ = "ksql"
3-
__ksql_server_version__ = "5.0.0-SNAPSHOT"
3+
__ksql_server_version__ = "0.10.1"
44
__ksql_api_version__ = "0.1.2"
55
__version__ = __ksql_server_version__ + "." + __ksql_api_version__
66

Diff for: ksql/api.py

+36-27
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import time
2+
3+
import base64
14
import functools
25
import json
3-
import time
46
import logging
5-
67
import requests
8+
import urllib
79
from requests import Timeout
810

911
from ksql.builder import SQLBuilder
@@ -32,17 +34,14 @@ def _validate_sql_string(sql_string):
3234
return sql_string
3335

3436
@staticmethod
35-
def _raise_for_status(r):
36-
try:
37-
r_json = r.json()
38-
except ValueError:
39-
r.raise_for_status()
40-
if r.status_code != 200:
37+
def _raise_for_status(r, response):
38+
r_json = json.loads(response)
39+
if r.getcode() != 200:
4140
# seems to be the new API behavior
4241
if r_json.get('@type') == 'statement_error' or r_json.get('@type') == 'generic_error':
4342
error_message = r_json['message']
4443
error_code = r_json['error_code']
45-
stackTrace = r_json['stackTrace']
44+
stackTrace = r_json['stack_trace']
4645
raise KSQLError(error_message, error_code, stackTrace)
4746
else:
4847
raise KSQLError("Unknown Error: {}".format(r.content))
@@ -58,9 +57,10 @@ def _raise_for_status(r):
5857

5958
def ksql(self, ksql_string, stream_properties=None):
6059
r = self._request(endpoint='ksql', sql_string=ksql_string, stream_properties=stream_properties)
61-
self._raise_for_status(r)
62-
r = r.json()
63-
return r
60+
response = r.read().decode('utf-8')
61+
self._raise_for_status(r, response)
62+
res = json.loads(response)
63+
return res
6464

6565
def query(self, query_string, encoding='utf-8', chunk_size=128, stream_properties=None, idle_timeout=None):
6666
"""
@@ -69,8 +69,9 @@ def query(self, query_string, encoding='utf-8', chunk_size=128, stream_propertie
6969
"""
7070
streaming_response = self._request(endpoint='query', sql_string=query_string, stream_properties=stream_properties)
7171
start_idle = None
72-
if streaming_response.status_code == 200:
73-
for chunk in streaming_response.iter_content(chunk_size=chunk_size):
72+
73+
if streaming_response.code == 200:
74+
for chunk in streaming_response:
7475
if chunk != b'\n':
7576
start_idle = None
7677
yield chunk.decode(encoding)
@@ -86,7 +87,7 @@ def query(self, query_string, encoding='utf-8', chunk_size=128, stream_propertie
8687
def get_request(self, endpoint):
8788
return requests.get(endpoint, auth=(self.api_key, self.secret))
8889

89-
def _request(self, endpoint, sql_string, method='post', stream_properties=None):
90+
def _request(self, endpoint, method='POST', sql_string='', stream_properties=None, encoding='utf-8'):
9091
url = '{}/{}'.format(self.url, endpoint)
9192

9293
logging.debug("KSQL generated: {}".format(sql_string))
@@ -97,28 +98,36 @@ def _request(self, endpoint, sql_string, method='post', stream_properties=None):
9798
}
9899
if stream_properties:
99100
body['streamsProperties'] = stream_properties
100-
data = json.dumps(body)
101+
data = json.dumps(body).encode(encoding)
101102

102103
headers = {
103104
"Accept": "application/json",
104105
"Content-Type": "application/json"
105106
}
107+
if self.api_key and self.secret:
108+
base64string = base64.b64encode('{}:{}' % (self.api_key, self.secret))
109+
headers["Authorization"] = "Basic {}" % base64string
106110

107-
if endpoint == 'query':
108-
stream = True
109-
else:
110-
stream = False
111-
112-
r = requests.request(
113-
method=method,
111+
req = urllib.request.Request(
114112
url=url,
115113
data=data,
116-
timeout=self.timeout,
117114
headers=headers,
118-
stream=stream,
119-
auth=(self.api_key, self.secret))
115+
method=method.upper()
116+
)
120117

121-
return r
118+
try:
119+
r = urllib.request.urlopen(req, timeout=self.timeout)
120+
except urllib.error.HTTPError as e:
121+
try:
122+
content = json.loads(e.read().decode(encoding) )
123+
except Exception as e:
124+
raise ValueError(e)
125+
else:
126+
logging.debug("content: {}".format(content))
127+
raise KSQLError(e=content.get('message'),
128+
error_code=content.get('error_code'))
129+
else:
130+
return r
122131

123132
@staticmethod
124133
def retry(exceptions, delay=1, max_retries=5):

Diff for: tests/test_client.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import unittest
32
import requests
43

@@ -30,7 +29,7 @@ def setUp(self):
3029

3130
def tearDown(self):
3231
if utils.check_kafka_available(self.bootstrap_servers):
33-
utils.drop_all_streams(self.api_client, prefix=self.test_prefix)
32+
utils.drop_all_streams(self.api_client)
3433

3534
def test_get_url(self):
3635
self.assertEqual(self.api_client.get_url(), "http://localhost:8088")
@@ -54,21 +53,22 @@ def test_get_ksql_version_success(self):
5453
@vcr.use_cassette('tests/vcr_cassettes/get_properties.yml')
5554
def test_get_properties(self):
5655
properties = self.api_client.get_properties()
57-
self.assertEqual(properties['ksql.schema.registry.url'], "http://localhost:8081")
56+
property = [i for i in properties if i['name'] == 'ksql.schema.registry.url'][0]
57+
self.assertEqual(property.get('value'), "http://schema-registry:8081")
5858

5959
@vcr.use_cassette('tests/vcr_cassettes/ksql_show_table.yml')
6060
def test_ksql_show_tables(self):
6161
""" Test GET requests """
6262
ksql_string = "show tables;"
6363
r = self.api_client.ksql(ksql_string)
64-
self.assertEqual(r, [{'@type': 'tables', 'statementText': 'show tables;', 'tables': []}])
64+
self.assertEqual(r, [{'@type': 'tables', 'statementText': 'show tables;', 'tables': [], 'warnings': []}])
6565

6666
@vcr.use_cassette('tests/vcr_cassettes/ksql_show_table.yml')
6767
def test_ksql_show_tables_with_no_semicolon(self):
6868
""" Test GET requests """
6969
ksql_string = "show tables"
7070
r = self.api_client.ksql(ksql_string)
71-
self.assertEqual(r, [{'@type': 'tables', 'statementText': 'show tables;', 'tables': []}])
71+
self.assertEqual(r, [{'@type': 'tables', 'statementText': 'show tables;', 'tables': [], 'warnings': []}])
7272

7373
@vcr.use_cassette('tests/vcr_cassettes/ksql_create_stream.yml')
7474
def test_ksql_create_stream(self):
@@ -108,9 +108,10 @@ def test_ksql_create_stream_w_properties(self):
108108
def test_bad_requests(self):
109109
broken_ksql_string = "noi"
110110
with self.assertRaises(KSQLError) as e:
111-
r = self.api_client.ksql(broken_ksql_string)
112-
the_exception = e.exception
113-
self.assertEqual(the_exception.error_code, 40000)
111+
self.api_client.ksql(broken_ksql_string)
112+
113+
exception = e.exception
114+
self.assertEqual(exception.error_code, 40001)
114115

115116
@vcr.use_cassette('tests/vcr_cassettes/ksql_create_stream_by_builder.yml')
116117
def test_ksql_create_stream_by_builder(self):

Diff for: tests/unit-tests/test_api.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@ def test_base_api_query(self):
1313
stream=True)
1414
base = BaseAPI("http://dummy.org")
1515
result = base.query("so")
16-
for entry in result:
17-
self.assertEqual(entry, "test")
16+
with self.assertRaises(ValueError):
17+
for entry in result:
18+
entry

Diff for: tests/vcr_cassettes/bad_requests.yml

+26-16
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,34 @@ interactions:
22
- request:
33
body: '{"ksql": "noi;"}'
44
headers:
5-
Accept: [application/json]
6-
Accept-Encoding: ['gzip, deflate']
7-
Connection: [keep-alive]
8-
Content-Length: ['16']
9-
Content-Type: [application/json]
10-
User-Agent: [python-requests/2.19.1]
5+
Accept:
6+
- application/json
7+
Connection:
8+
- close
9+
Content-Length:
10+
- '16'
11+
Content-Type:
12+
- application/json
13+
Host:
14+
- localhost:8088
15+
User-Agent:
16+
- Python-urllib/3.6
1117
method: POST
1218
uri: http://localhost:8088/ksql
1319
response:
14-
body: {string: '{"@type":"generic_error","error_code":40000,"message":"line 1:1:
15-
mismatched input ''noi'' expecting {<EOF>, ''('', ''SELECT'', ''VALUES'',
16-
''CREATE'', ''REGISTER'', ''TABLE'', ''INSERT'', ''DESCRIBE'', ''PRINT'',
17-
''EXPLAIN'', ''SHOW'', ''LIST'', ''TERMINATE'', ''LOAD'', ''DROP'', ''SET'',
18-
''EXPORT'', ''UNSET'', ''RUN''}\nCaused by: org.antlr.v4.runtime.InputMismatchException","stackTrace":["io.confluent.ksql.parser.KsqlParser.buildAst(KsqlParser.java:66)","io.confluent.ksql.KsqlEngine.getStatements(KsqlEngine.java:497)","io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:171)","sun.reflect.GeneratedMethodAccessor6.invoke(Unknown
19-
Source)","sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)","java.lang.reflect.Method.invoke(Method.java:498)","org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:76)","org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:148)","org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:191)","org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:200)","org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:103)","org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:493)","org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:415)","org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:104)","org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:277)","org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)","org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)","org.glassfish.jersey.internal.Errors.process(Errors.java:316)","org.glassfish.jersey.internal.Errors.process(Errors.java:298)","org.glassfish.jersey.internal.Errors.process(Errors.java:268)","org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)","org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)","org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)","org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)","org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:409)","org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:584)","org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:525)","org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:462)","org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)","org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533)","org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)","org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)","org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)","org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1253)","org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)","org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)","org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)","org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)","org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1155)","org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)","org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)","org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:169)","org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:219)","org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)","org.eclipse.jetty.server.Server.handle(Server.java:531)","org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352)","org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)","org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)","org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)","org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)","org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)","org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)","org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)","org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)","org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)","org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:760)","org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:678)","java.lang.Thread.run(Thread.java:748)"]}'}
20+
body:
21+
string: '{"@type":"statement_error","error_code":40001,"message":"line 1:1:
22+
mismatched input ''noi'' expecting {<EOF>, ''SELECT'', ''CREATE'', ''INSERT'',
23+
''DESCRIBE'', ''PRINT'', ''EXPLAIN'', ''SHOW'', ''LIST'', ''TERMINATE'', ''DROP'',
24+
''SET'', ''UNSET''}","statementText":"noi;","entities":[]}'
2025
headers:
21-
Content-Type: [application/json]
22-
Date: ['Fri, 20 Jul 2018 21:19:08 GMT']
23-
Server: [Jetty(9.4.10.v20180503)]
24-
status: {code: 400, message: Bad Request}
26+
connection:
27+
- close
28+
content-length:
29+
- '261'
30+
content-type:
31+
- application/json
32+
status:
33+
code: 400
34+
message: Bad Request
2535
version: 1

Diff for: tests/vcr_cassettes/get_ksql_server.yml

+19-9
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,27 @@ interactions:
22
- request:
33
body: null
44
headers:
5-
Accept: ['*/*']
6-
Accept-Encoding: ['gzip, deflate']
7-
Connection: [keep-alive]
8-
User-Agent: [python-requests/2.19.1]
5+
Accept:
6+
- '*/*'
7+
Accept-Encoding:
8+
- gzip, deflate
9+
Authorization:
10+
- Basic Tm9uZTpOb25l
11+
Connection:
12+
- keep-alive
13+
User-Agent:
14+
- python-requests/2.18.4
915
method: GET
1016
uri: http://localhost:8088/info
1117
response:
12-
body: {string: '{"KsqlServerInfo":{"version":"5.0.0-SNAPSHOT","kafkaClusterId":"9HvFRIoMSyy1YUxjpOt-gg","ksqlServiceId":"default_"}}'}
18+
body:
19+
string: '{"KsqlServerInfo":{"version":"0.10.1","kafkaClusterId":"DkvN9zaxSjO5NxYIkhjKCQ","ksqlServiceId":"default_"}}'
1320
headers:
14-
Content-Type: [application/vnd.ksql.v1+json]
15-
Date: ['Fri, 20 Jul 2018 20:08:04 GMT']
16-
Server: [Jetty(9.4.10.v20180503)]
17-
status: {code: 200, message: OK}
21+
content-length:
22+
- '108'
23+
content-type:
24+
- application/json
25+
status:
26+
code: 200
27+
message: OK
1828
version: 1

0 commit comments

Comments
 (0)