Skip to content

Commit a0ddf7b

Browse files
authored
pyalisa (#3107)
* pyalisa support py2 * add task.py * implement task.py * move from_env from client to config module * fix response field name * add env:SKYNET_TENANT_ID * add test for task * remove duplicate test case * fix config en/decoding * add test case for task * fix unittest for alisa.config
1 parent 5c48b9b commit a0ddf7b

File tree

7 files changed

+297
-89
lines changed

7 files changed

+297
-89
lines changed

python/runtime/dbapi/pyalisa/client.py

Lines changed: 43 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License
1313

14+
import ast
1415
import json
15-
import os
1616
import random
1717
import string
1818
import time
1919
from enum import Enum
2020

21-
from runtime.dbapi.pyalisa.config import Config
2221
from runtime.dbapi.pyalisa.pop import Pop
2322

2423

@@ -35,15 +34,15 @@ class AlisaTaksStatus(Enum):
3534
ALISA_TASK_ALLOCATE = 11
3635

3736

38-
# used to deal with too many logs.
37+
# used to deal with too many logs
3938
MAX_LOG_NUM = 2000
4039

4140

4241
class Client(object):
4342
"""Client for building kinds of tasks and submitting them to alisa gateway
4443
4544
Args:
46-
config(Config): the config for build the client
45+
config(Config): the Config(runtime.dbapi.pyalisa.config)
4746
"""
4847
def __init__(self, config):
4948
self.config = config # noqa F841
@@ -93,7 +92,6 @@ def create_pyodps_task(self, code, args):
9392
params["Exec"] = self.config.withs["Exec4PyODPS"]
9493
if len(args) > 0:
9594
params["Args"] = args
96-
9795
return self._create_task(params)
9896

9997
def _create_task(self, params):
@@ -168,12 +166,12 @@ def read_logs(self, task_id, offset, w):
168166
params["AlisaTaskId"] = task_id
169167
params["Offset"] = str(offset)
170168
log = self._requet_and_parse_response("GetAlisaTaskLog", params)
171-
rlen = int(log["ReadLen"])
169+
rlen = int(log["readLength"])
172170
if rlen == 0:
173171
return offset
174172
offset += rlen
175-
w.write(log["Content"])
176-
if bool(log["End"]):
173+
w.write(log["logMsg"])
174+
if bool(log["isEnd"]):
177175
return -1
178176
return offset
179177

@@ -205,20 +203,23 @@ def get_results(self, task_id, batch):
205203
if batch <= 0:
206204
raise ValueError("batch should greater than 0")
207205
count = self.count_results(task_id)
208-
result = []
206+
207+
columns, body = [], []
209208
for i in range(0, count, batch):
210209
params = self._base_params()
211210
params["AlisaTaskId"] = task_id
212211
params["Start"] = str(i)
213212
params["Limit"] = str(batch)
214-
r = self._requet_and_parse_response("GetAlisaTaskResult", params)
215-
# TODO(lhw): parse the result like:
216-
# https://github.com/sql-machine-learning/goalisa/blob/68d3aad1344c9e5c0cd35c6556e1f3f2b6ca9db7/alisa.go#L190
217-
result.append(r)
218-
return result
213+
val = self._requet_and_parse_response("GetAlisaTaskResult", params)
214+
header, rows = self._parse_alisa_value(val)
215+
if len(columns) == 0:
216+
columns = header
217+
body.extend(rows)
218+
return {"columns": columns, "body": body}
219219

220220
def stop(self, task_id):
221-
"""Stop given task
221+
"""Stop given task.
222+
NOTE(weiguoz): need to be tested.
222223
223224
Args:
224225
task_id(string): the task to stop
@@ -231,51 +232,38 @@ def stop(self, task_id):
231232
res = self._requet_and_parse_response("StopAlisaTask", params)
232233
return bool(res)
233234

235+
def _parse_alisa_value(self, val):
236+
"""Parse 'returnValue' in alisa response
237+
https://github.com/sql-machine-learning/goalisa/blob/68d3aad1344c9e5c0cd35c6556e1f3f2b6ca9db7/alisa.go#L190
238+
239+
Args:
240+
val: [{u'resultMsg': u'[["Alice","23.8","56000"]]',
241+
u'dataHeader': u'["name::string","age::double","salary::bigint"]'}]
242+
"""
243+
jsval = ast.literal_eval(json.dumps(val))
244+
columns = []
245+
for h in json.loads(jsval['dataHeader']):
246+
nt = h.split("::")
247+
name, typ = (nt[0], nt[1]) if len(nt) == 2 else (h, "string")
248+
columns.append({"name": str(name), "typ": str(typ)})
249+
body = []
250+
for m in json.loads(jsval['resultMsg']):
251+
row = []
252+
for i in ast.literal_eval(json.dumps(m)):
253+
row.append(i)
254+
body.append(row)
255+
return columns, body
256+
234257
def _requet_and_parse_response(self, action, params):
235258
params["Action"] = action
236259
params["ProjectEnv"] = self.config.env["SKYNET_SYSTEM_ENV"]
237260
url = self.config.pop_scheme + "://" + self.config.pop_url
238261
code, buf = Pop.request(url, params, self.config.pop_access_secret)
239262
resp = json.loads(buf)
240263
if code != 200:
241-
raise RuntimeError("%s got a bad result, response=%s" %
242-
(code, buf))
264+
raise RuntimeError("%s got a bad result, request=%s, response=%s" %
265+
(code, params, buf))
266+
if resp['returnCode'] != '0':
267+
raise Exception("returned an error request={}, response={}".format(
268+
params, resp))
243269
return resp["returnValue"]
244-
245-
@staticmethod
246-
def from_env():
247-
"""Build a Client from environment variable
248-
249-
Returns:
250-
a Client instance
251-
"""
252-
if not os.getenv("POP_SECRET"):
253-
return None
254-
conf = Config()
255-
conf.pop_url = os.getenv("POP_URL")
256-
conf.pop_access_id = os.getenv("POP_ID")
257-
conf.pop_access_secret = os.getenv("POP_SECRET")
258-
conf.pop_scheme = "http"
259-
conf.verbose = os.getenv("VERBOSE") == "true"
260-
conf.env = {
261-
"SKYNET_ONDUTY": os.getenv("SKYNET_ONDUTY"),
262-
"SKYNET_ACCESSID": os.getenv("SKYNET_ACCESSID"),
263-
"SKYNET_ACCESSKEY": os.getenv("SKYNET_ACCESSKEY"),
264-
"SKYNET_ENDPOINT": os.getenv("SKYNET_ENDPOINT"),
265-
"SKYNET_SYSTEMID": os.getenv("SKYNET_SYSTEMID"),
266-
"SKYNET_PACKAGEID": os.getenv("SKYNET_PACKAGEID"),
267-
"SKYNET_SYSTEM_ENV": os.getenv("SKYNET_SYSTEM_ENV"),
268-
"SKYNET_BIZDATE": os.getenv("SKYNET_BIZDATE"),
269-
"ALISA_TASK_EXEC_TARGET": os.getenv("ALISA_TASK_EXEC_TARGET"),
270-
}
271-
conf.withs = {
272-
"CustomerId": os.getenv("CustomerId"),
273-
"PluginName": os.getenv("PluginName"),
274-
"Exec": os.getenv("Exec"),
275-
"PluginName4PyODPS": os.getenv("PluginName4PyODPS"),
276-
"Exec4PyODPS": os.getenv("Exec4PyODPS"),
277-
}
278-
conf.curr_project = conf.env["SKYNET_PACKAGEID"]
279-
if len(conf.env["SKYNET_SYSTEMID"]) > 0:
280-
conf.curr_project += "_" + conf.env["SKYNET_SYSTEMID"]
281-
return Client(conf)

python/runtime/dbapi/pyalisa/client_test.py

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616

1717
from runtime import testing
1818
from runtime.dbapi.pyalisa.client import AlisaTaksStatus, Client
19+
from runtime.dbapi.pyalisa.config import Config
1920

2021

2122
@unittest.skipUnless(testing.get_driver() == "alisa", "Skip non-alisa test")
2223
class TestClient(unittest.TestCase):
2324
def test_create_sql_task(self):
24-
ali = Client.from_env()
25+
ali = Client(Config.from_env())
2526
code = "SELECT 2;"
2627
task_id, _ = ali.create_sql_task(code)
2728
self.assertIsNotNone(task_id)
@@ -31,6 +32,7 @@ def test_create_sql_task(self):
3132

3233
# try get result
3334
for _ in range(10):
35+
# to avoid touching the flow-control
3436
time.sleep(1)
3537
status = ali.get_status(task_id)
3638
if ali.completed(status):
@@ -40,24 +42,6 @@ def test_create_sql_task(self):
4042
self.assertGreater(len(result), 0)
4143
break
4244

43-
def test_create_pyodps_task(self):
44-
ali = Client.from_env()
45-
code = """import argparse
46-
if __name__ == "__main__":
47-
input_table_name = args['input_table']
48-
output_table_name = args['output_table']
49-
print(input_table_name)
50-
print(output_table_name)
51-
input_table = o.get_table(input_table_name)
52-
print(input_table.schema)
53-
output_table = o.create_table(output_table_name, input_table.schema)
54-
"""
55-
args = "input_table=table_1 output_table=table_2"
56-
task_id, _ = ali.create_pyodps_task(code, args)
57-
# to avoid touching the flow-control
58-
time.sleep(2)
59-
self.assertIsNotNone(task_id)
60-
6145

6246
if __name__ == "__main__":
6347
unittest.main()

python/runtime/dbapi/pyalisa/config.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import base64
1515
import json
16+
import os
1617
from collections import OrderedDict
1718

1819
from six.moves.urllib.parse import parse_qs, urlparse
@@ -66,7 +67,7 @@ def _encode_json_base64(env):
6667
@staticmethod
6768
def _decode_json_base64(b64env):
6869
padded = b64env + "=" * (len(b64env) % 4)
69-
jstr = base64.urlsafe_b64decode(padded).decode("utf8")
70+
jstr = base64.urlsafe_b64decode(padded.encode('utf8')).decode("utf8")
7071
return json.loads(jstr)
7172

7273
def to_url(self):
@@ -87,3 +88,42 @@ def to_url(self):
8788
)
8889
return ("alisa://%s:%s@%s?scheme=%s&verbose"
8990
"=%s&curr_project=%s&env=%s&with=%s") % parts
91+
92+
@staticmethod
93+
def from_env():
94+
"""Build a Client from environment variable
95+
96+
Returns:
97+
a Client instance
98+
"""
99+
if not os.getenv("POP_SECRET"):
100+
return None
101+
conf = Config()
102+
conf.pop_url = os.getenv("POP_URL")
103+
conf.pop_access_id = os.getenv("POP_ID")
104+
conf.pop_access_secret = os.getenv("POP_SECRET")
105+
conf.pop_scheme = "http"
106+
conf.verbose = os.getenv("VERBOSE") == "true"
107+
conf.env = {
108+
"SKYNET_ONDUTY": os.getenv("SKYNET_ONDUTY"),
109+
"SKYNET_ACCESSID": os.getenv("SKYNET_ACCESSID"),
110+
"SKYNET_ACCESSKEY": os.getenv("SKYNET_ACCESSKEY"),
111+
"SKYNET_ENDPOINT": os.getenv("SKYNET_ENDPOINT"),
112+
"SKYNET_SYSTEMID": os.getenv("SKYNET_SYSTEMID"),
113+
"SKYNET_PACKAGEID": os.getenv("SKYNET_PACKAGEID"),
114+
"SKYNET_SYSTEM_ENV": os.getenv("SKYNET_SYSTEM_ENV"),
115+
"SKYNET_BIZDATE": os.getenv("SKYNET_BIZDATE"),
116+
"SKYNET_TENANT_ID": os.getenv("SKYNET_TENANT_ID"),
117+
"ALISA_TASK_EXEC_TARGET": os.getenv("ALISA_TASK_EXEC_TARGET"),
118+
}
119+
conf.withs = {
120+
"CustomerId": os.getenv("CustomerId"),
121+
"PluginName": os.getenv("PluginName"),
122+
"Exec": os.getenv("Exec"),
123+
"PluginName4PyODPS": os.getenv("PluginName4PyODPS"),
124+
"Exec4PyODPS": os.getenv("Exec4PyODPS"),
125+
}
126+
conf.curr_project = conf.env["SKYNET_PACKAGEID"]
127+
if len(conf.env["SKYNET_SYSTEMID"]) > 0:
128+
conf.curr_project += "_" + conf.env["SKYNET_SYSTEMID"]
129+
return conf

python/runtime/dbapi/pyalisa/config_test.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import unittest
1515

16+
from runtime import testing
1617
from runtime.dbapi.pyalisa.config import Config
1718

1819
test_url = ("alisa://pid:[email protected]/?scheme=http&verbose=true&"
@@ -27,13 +28,16 @@
2728
"sICJQbHVnaW5OYW1lIjogIndwZSIsICJDdXN0b21lcklkIjogIndjZCJ9")
2829

2930

31+
@unittest.skipUnless(testing.get_driver() == "alisa", "Skip non-alisa test")
3032
class TestConfig(unittest.TestCase):
33+
"""We use python2 in alisa, so let's skip the tests in the other drivers.
34+
"""
3135
def test_encode_json_base64(self):
3236
params = dict()
3337
params["key1"] = "val1"
3438
params["key2"] = "val2"
3539
b64 = Config._encode_json_base64(params)
36-
self.assertEqual("eyJrZXkxIjogInZhbDEiLCAia2V5MiI6ICJ2YWwyIn0", b64)
40+
self.assertEqual("eyJrZXkyIjogInZhbDIiLCAia2V5MSI6ICJ2YWwxIn0", b64)
3741

3842
params = Config._decode_json_base64(b64)
3943
self.assertEqual(2, len(params))
@@ -52,9 +56,18 @@ def test_dsn_parsing(self):
5256
self.assertEqual("SKY", cfg.env["SKYNET_ACCESSKEY"])
5357

5458
def test_to_dsn(self):
55-
cfg = Config(test_url)
56-
url = cfg.to_url()
57-
self.assertEqual(test_url, url)
59+
c1 = Config(test_url)
60+
u1 = c1.to_url()
61+
c2 = Config(u1)
62+
self.assertEqual(c1.pop_access_id, c2.pop_access_id)
63+
self.assertEqual(c1.pop_access_secret, c2.pop_access_secret)
64+
self.assertEqual(c1.curr_project, c2.curr_project)
65+
self.assertEqual(c1.scheme, c2.scheme)
66+
self.assertEqual(c1.withs["CustomerId"], c2.withs["CustomerId"])
67+
self.assertEqual(c1.withs["PluginName"], c2.withs["PluginName"])
68+
self.assertEqual(c1.withs["Exec"], c2.withs["Exec"])
69+
self.assertEqual(c1.env["SKYNET_ACCESSKEY"],
70+
c2.env["SKYNET_ACCESSKEY"])
5871

5972
def test_parse_error(self):
6073
# no env and with

python/runtime/dbapi/pyalisa/pop.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414
import base64
1515
import hashlib
1616
import hmac
17-
from encodings import utf_8
17+
import urllib
1818

1919
import requests
20-
from six.moves import urllib
2120

2221

2322
class Pop(object):
@@ -59,10 +58,9 @@ def signature(params, http_method, secret):
5958
ek = Pop.percent_encode(k)
6059
ev = Pop.percent_encode(v)
6160
qry += '&{}={}'.format(ek, ev)
62-
to_sign = '{}&%2F&{}'.format(http_method, Pop.percent_encode(qry[1:]))
63-
bf = bytearray(utf_8.encode(to_sign)[0])
64-
dig = hmac.digest(utf_8.encode(secret + "&")[0], bf, hashlib.sha1)
65-
return utf_8.decode(base64.standard_b64encode(dig))[0]
61+
str = '{}&%2F&{}'.format(http_method, Pop.percent_encode(qry[1:]))
62+
dig = hmac.new(secret + '&', str, hashlib.sha1).digest()
63+
return base64.standard_b64encode(dig)
6664

6765
@staticmethod
6866
def percent_encode(str):
@@ -75,7 +73,7 @@ def percent_encode(str):
7573
Returns:
7674
(string) the encoded param
7775
"""
78-
es = urllib.parse.quote_plus(str)
79-
es = es.replace('+', '%20')
80-
es = es.replace('*', '%2A')
81-
return es.replace('%7E', '~')
76+
s = urllib.quote_plus(str)
77+
s = s.replace('+', '%20')
78+
s = s.replace('*', '%2A')
79+
return s.replace('%7E', '~')

0 commit comments

Comments
 (0)