6
6
import logging
7
7
import requests
8
8
import urllib
9
+ from copy import deepcopy
9
10
from requests import Timeout
10
11
11
12
from ksql .builder import SQLBuilder
12
- from ksql .errors import CreateError , KSQLError , InvalidQueryError
13
+ from ksql .errors import CreateError , InvalidQueryError , KSQLError
13
14
14
15
15
16
class BaseAPI (object ):
@@ -20,6 +21,9 @@ def __init__(self, url, **kwargs):
20
21
self .timeout = kwargs .get ("timeout" , 15 )
21
22
self .api_key = kwargs .get ("api_key" )
22
23
self .secret = kwargs .get ("secret" )
24
+ self .headers = {
25
+ 'Content-Type' : 'application/vnd.ksql.v1+json; charset=utf-8' ,
26
+ }
23
27
24
28
def get_timout (self ):
25
29
return self .timeout
@@ -86,7 +90,8 @@ def query(self, query_string, encoding="utf-8", chunk_size=128, stream_propertie
86
90
raise ValueError ("Return code is {}." .format (streaming_response .status_code ))
87
91
88
92
def get_request (self , endpoint ):
89
- return requests .get (endpoint , auth = (self .api_key , self .secret ))
93
+ auth = (self .api_key , self .secret ) if self .api_key or self .secret else None
94
+ return requests .get (endpoint , headers = self .headers , auth = auth )
90
95
91
96
def _request (self , endpoint , method = "POST" , sql_string = "" , stream_properties = None , encoding = "utf-8" ):
92
97
url = "{}/{}" .format (self .url , endpoint )
@@ -97,22 +102,24 @@ def _request(self, endpoint, method="POST", sql_string="", stream_properties=Non
97
102
body = {"ksql" : sql_string }
98
103
if stream_properties :
99
104
body ["streamsProperties" ] = stream_properties
105
+ else :
106
+ body ["streamsProperties" ] = {}
100
107
data = json .dumps (body ).encode (encoding )
101
108
102
- headers = { "Accept" : "application/json" , "Content-Type" : "application/json" }
109
+ headers = deepcopy ( self . headers )
103
110
if self .api_key and self .secret :
104
- base64string = base64 .b64encode (bytes ("{}:{}" .format (self .api_key , self .secret ), "utf-8" ))
105
- headers ["Authorization" ] = "Basic {} " % base64string
111
+ base64string = base64 .b64encode (bytes ("{}:{}" .format (self .api_key , self .secret ), "utf-8" )). decode ( "utf-8" )
112
+ headers ["Authorization" ] = "Basic %s " % base64string
106
113
107
114
req = urllib .request .Request (url = url , data = data , headers = headers , method = method .upper ())
108
115
109
116
try :
110
117
r = urllib .request .urlopen (req , timeout = self .timeout )
111
- except urllib .error .HTTPError as e :
118
+ except urllib .error .HTTPError as http_error :
112
119
try :
113
- content = json .loads (e .read ().decode (encoding ))
120
+ content = json .loads (http_error .read ().decode (encoding ))
114
121
except Exception as e :
115
- raise ValueError ( e )
122
+ raise http_error
116
123
else :
117
124
logging .debug ("content: {}" .format (content ))
118
125
raise KSQLError (e = content .get ("message" ), error_code = content .get ("error_code" ))
0 commit comments