Skip to content

Commit a3fc75b

Browse files
Merge branch 'main' into SSL_Cert
2 parents 733b743 + 9e3e02c commit a3fc75b

File tree

4 files changed

+87
-2
lines changed

4 files changed

+87
-2
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,6 @@ ENV/
9999

100100
# mypy
101101
.mypy_cache/
102+
103+
# jetbrains
104+
.idea

ksql/utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def parse_columns(columns_str):
8181

8282

8383
def process_row(row, column_names):
84-
row = row.replace(",\n", "").replace("]\n", "")
84+
row = row.replace(",\n", "").replace("]\n", "").rstrip("]")
8585
row_obj = json.loads(row)
8686
if "finalMessage" in row_obj:
8787
return None

test-requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ codecov
33
vcrpy
44
coverage
55
confluent-kafka[avro]
6-
responses
6+
responses

tests/unit-tests/utils_tests.py

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import unittest
2+
3+
import ksql.utils
4+
5+
6+
class TestKSQLUtils(unittest.TestCase):
7+
"""Test case for the client methods."""
8+
9+
def test_process_header(self):
10+
header_str = '[{"header":{"queryId":"query_1643298761990","schema":"`COMPANY_UID` STRING KEY, `USER_UID` STRING KEY, `USER_STATUS_ID` BIGINT KEY, `BONUS_PCT` STRING"}},\n'
11+
actual_columns = ksql.utils.parse_columns(header_str)
12+
expected_columns = [
13+
{"name": "COMPANY_UID", "type": "STRING"},
14+
{"name": "USER_UID", "type": "STRING"},
15+
{"name": "USER_STATUS_ID", "type": "BIGINT"},
16+
{"name": "BONUS_PCT", "type": "STRING"},
17+
]
18+
self.assertEqual(actual_columns, expected_columns)
19+
20+
def test_process_row_with_no_dangling_closing_bracket(self):
21+
columns = [
22+
{"name": "COMPANY_UID", "type": "STRING"},
23+
{"name": "USER_UID", "type": "STRING"},
24+
{"name": "USER_STATUS_ID", "type": "BIGINT"},
25+
{"name": "BONUS_PCT", "type": "STRING"},
26+
]
27+
row = '{"row":{"columns":["f08c77db7","fcafb7c23",11508,"1.10976000000000000000"]}},\n'
28+
29+
actual = ksql.utils.process_row(row, columns)
30+
expected = {
31+
"BONUS_PCT": "1.10976000000000000000",
32+
"COMPANY_UID": "f08c77db7",
33+
"USER_UID": "fcafb7c23",
34+
"USER_STATUS_ID": 11508,
35+
}
36+
self.assertEqual(actual, expected)
37+
38+
def test_process_row_with_dangling_closing_bracket(self):
39+
columns = [
40+
{"name": "COMPANY_UID", "type": "STRING"},
41+
{"name": "USER_UID", "type": "STRING"},
42+
{"name": "USER_STATUS_ID", "type": "BIGINT"},
43+
{"name": "BONUS_PCT", "type": "STRING"},
44+
]
45+
row = '{"row":{"columns":["f08c77db7","fdcacbca1",13120,"1.09760000000000000000"]}}]'
46+
47+
actual = ksql.utils.process_row(row, columns)
48+
expected = {
49+
"BONUS_PCT": "1.09760000000000000000",
50+
"COMPANY_UID": "f08c77db7",
51+
"USER_UID": "fdcacbca1",
52+
"USER_STATUS_ID": 13120,
53+
}
54+
self.assertEqual(actual, expected)
55+
56+
57+
def test_process_query_results(self):
58+
results = (
59+
r
60+
for r in [
61+
'[{"header":{"queryId":"query_1643298761990","schema":"`COMPANY_UID` STRING KEY, `USER_UID` STRING KEY, `USER_STATUS_ID` BIGINT KEY, `BONUS_PCT` STRING"}},\n',
62+
'{"row":{"columns":["f08c77db7","fcafb7c23",11508,"1.10976000000000000000"]}},\n',
63+
'{"row":{"columns":["f08c77db7","fdcacbca1",13120,"1.09760000000000000000"]}}]',
64+
]
65+
)
66+
67+
actual = list(ksql.utils.process_query_result(results, return_objects=True))
68+
expected = [
69+
{
70+
"BONUS_PCT": "1.10976000000000000000",
71+
"COMPANY_UID": "f08c77db7",
72+
"USER_STATUS_ID": 11508,
73+
"USER_UID": "fcafb7c23",
74+
},
75+
{
76+
"BONUS_PCT": "1.09760000000000000000",
77+
"COMPANY_UID": "f08c77db7",
78+
"USER_STATUS_ID": 13120,
79+
"USER_UID": "fdcacbca1",
80+
},
81+
]
82+
self.assertEqual(actual, expected)

0 commit comments

Comments
 (0)