66import logging
77import requests
88import urllib
9+ from copy import deepcopy
910from requests import Timeout
1011
1112from ksql .builder import SQLBuilder
12- from ksql .errors import CreateError , KSQLError , InvalidQueryError
13+ from ksql .errors import CreateError , InvalidQueryError , KSQLError
1314
1415
1516class BaseAPI (object ):
@@ -20,6 +21,9 @@ def __init__(self, url, **kwargs):
2021 self .timeout = kwargs .get ("timeout" , 15 )
2122 self .api_key = kwargs .get ("api_key" )
2223 self .secret = kwargs .get ("secret" )
24+ self .headers = {
25+ 'Content-Type' : 'application/vnd.ksql.v1+json; charset=utf-8' ,
26+ }
2327
2428 def get_timout (self ):
2529 return self .timeout
@@ -87,7 +91,7 @@ def query(self, query_string, encoding="utf-8", chunk_size=128, stream_propertie
8791
8892 def get_request (self , endpoint ):
8993 auth = (self .api_key , self .secret ) if self .api_key or self .secret else None
90- return requests .get (endpoint , auth = auth )
94+ return requests .get (endpoint , headers = self . headers , auth = auth )
9195
9296 def _request (self , endpoint , method = "POST" , sql_string = "" , stream_properties = None , encoding = "utf-8" ):
9397 url = "{}/{}" .format (self .url , endpoint )
@@ -98,22 +102,24 @@ def _request(self, endpoint, method="POST", sql_string="", stream_properties=Non
98102 body = {"ksql" : sql_string }
99103 if stream_properties :
100104 body ["streamsProperties" ] = stream_properties
105+ else :
106+ body ["streamsProperties" ] = {}
101107 data = json .dumps (body ).encode (encoding )
102108
103- headers = { "Accept" : "application/json" , "Content-Type" : "application/json" }
109+ headers = deepcopy ( self . headers )
104110 if self .api_key and self .secret :
105- base64string = base64 .b64encode (bytes ("{}:{}" .format (self .api_key , self .secret ), "utf-8" ))
106- headers ["Authorization" ] = "Basic {} " % base64string
111+ base64string = base64 .b64encode (bytes ("{}:{}" .format (self .api_key , self .secret ), "utf-8" )). decode ( "utf-8" )
112+ headers ["Authorization" ] = "Basic %s " % base64string
107113
108114 req = urllib .request .Request (url = url , data = data , headers = headers , method = method .upper ())
109115
110116 try :
111117 r = urllib .request .urlopen (req , timeout = self .timeout )
112- except urllib .error .HTTPError as e :
118+ except urllib .error .HTTPError as http_error :
113119 try :
114- content = json .loads (e .read ().decode (encoding ))
120+ content = json .loads (http_error .read ().decode (encoding ))
115121 except Exception as e :
116- raise ValueError ( e )
122+ raise http_error
117123 else :
118124 logging .debug ("content: {}" .format (content ))
119125 raise KSQLError (e = content .get ("message" ), error_code = content .get ("error_code" ))
0 commit comments