Skip to content

Commit c5be5fb

Browse files
authored
fix: handle JSON params, files clause in copy into, reserved words (#63)
1 parent ee59b16 commit c5be5fb

File tree

5 files changed

+120
-3
lines changed

5 files changed

+120
-3
lines changed

databend_sqlalchemy/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
NDJSONFormat,
2020
ParquetFormat,
2121
ORCFormat,
22+
AVROFormat,
2223
AmazonS3,
2324
AzureBlobStorage,
2425
GoogleCloudStorage,

databend_sqlalchemy/connector.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# Many docstrings in this file are based on the PEP, which is in the public domain.
66
import decimal
77
import re
8+
import json
89
from datetime import datetime, date, time, timedelta
910
from databend_sqlalchemy.errors import Error, NotSupportedError
1011

@@ -56,6 +57,8 @@ def escape_item(self, item):
5657
return self.escape_string(item.strftime("%Y-%m-%d %H:%M:%S.%f")) + "::timestamp"
5758
elif isinstance(item, date):
5859
return self.escape_string(item.strftime("%Y-%m-%d")) + "::date"
60+
elif isinstance(item, dict):
61+
return self.escape_string(f'parse_json({json.dumps(item)})')
5962
else:
6063
return self.escape_string(item)
6164

databend_sqlalchemy/databend_dialect.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
"Comment",
8181
"CommentBlock",
8282
"Ident",
83+
"IdentVariable",
8384
"ColumnPosition",
8485
"LiteralString",
8586
"LiteralCodeString",
@@ -119,6 +120,7 @@
119120
"Caret",
120121
"LBrace",
121122
"RBrace",
123+
"Dollar",
122124
"RArrow",
123125
"LongRArrow",
124126
"FatRArrow",
@@ -187,8 +189,10 @@
187189
"BY",
188190
"BROTLI",
189191
"BZ2",
192+
"BLOCK",
190193
"CALL",
191194
"CASE",
195+
"CASE_SENSITIVE",
192196
"CAST",
193197
"CATALOG",
194198
"CATALOGS",
@@ -205,6 +209,7 @@
205209
"CONTINUE",
206210
"CHAR",
207211
"COLUMN",
212+
"COLUMN_MATCH_MODE",
208213
"COLUMNS",
209214
"CHARACTER",
210215
"CONFLICT",
@@ -223,6 +228,7 @@
223228
"DATA",
224229
"DATE",
225230
"DATE_ADD",
231+
"DATE_DIFF",
226232
"DATE_PART",
227233
"DATE_SUB",
228234
"DATE_TRUNC",
@@ -266,6 +272,7 @@
266272
"ENGINE",
267273
"ENGINES",
268274
"EPOCH",
275+
"MICROSECOND",
269276
"ERROR_ON_COLUMN_COUNT_MISMATCH",
270277
"ESCAPE",
271278
"EXCEPTION_BACKTRACE",
@@ -295,6 +302,7 @@
295302
"FORMAT_NAME",
296303
"FORMATS",
297304
"FRAGMENTS",
305+
"FRIDAY",
298306
"FROM",
299307
"FULL",
300308
"FUNCTION",
@@ -305,12 +313,14 @@
305313
"GET",
306314
"GENERATED",
307315
"GEOMETRY",
316+
"GEOGRAPHY",
308317
"GLOBAL",
309318
"GRAPH",
310319
"GROUP",
311320
"GZIP",
312321
"HAVING",
313322
"HIGH",
323+
"HILBERT",
314324
"HISTORY",
315325
"HIVE",
316326
"HOUR",
@@ -321,6 +331,7 @@
321331
"IDENTIFIER",
322332
"IF",
323333
"IN",
334+
"INCLUDE_QUERY_ID",
324335
"INCREMENTAL",
325336
"INDEX",
326337
"INFORMATION",
@@ -336,6 +347,9 @@
336347
"INTERVAL",
337348
"INTO",
338349
"INVERTED",
350+
"PREVIOUS_DAY",
351+
"PROCEDURE",
352+
"PROCEDURES",
339353
"IMMEDIATE",
340354
"IS",
341355
"ISODOW",
@@ -346,7 +360,9 @@
346360
"JWT",
347361
"KEY",
348362
"KILL",
363+
"LAST_DAY",
349364
"LATERAL",
365+
"LINEAR",
350366
"LOCATION_PREFIX",
351367
"LOCKS",
352368
"LOGICAL",
@@ -378,6 +394,7 @@
378394
"MODIFY",
379395
"MATERIALIZED",
380396
"MUST_CHANGE_PASSWORD",
397+
"NEXT_DAY",
381398
"NON_DISPLAY",
382399
"NATURAL",
383400
"NETWORK",
@@ -430,6 +447,7 @@
430447
"PRIORITY",
431448
"PURGE",
432449
"PUT",
450+
"PARTIAL",
433451
"QUARTER",
434452
"QUERY",
435453
"QUOTE",
@@ -445,6 +463,7 @@
445463
"REPLACE",
446464
"RETURN_FAILED_ONLY",
447465
"REVERSE",
466+
"SAMPLE",
448467
"MERGE",
449468
"MATCHED",
450469
"MISSING_FIELD_AS",
@@ -475,6 +494,8 @@
475494
"RLIKE",
476495
"RAW",
477496
"OPTIMIZED",
497+
"DECORRELATED",
498+
"SATURDAY",
478499
"SCHEMA",
479500
"SCHEMAS",
480501
"SECOND",
@@ -487,6 +508,7 @@
487508
"UNSET",
488509
"SESSION",
489510
"SETTINGS",
511+
"VARIABLES",
490512
"STAGES",
491513
"STATISTIC",
492514
"SUMMARY",
@@ -497,6 +519,7 @@
497519
"SINGLE",
498520
"SIZE_LIMIT",
499521
"MAX_FILES",
522+
"MONDAY",
500523
"SKIP_HEADER",
501524
"SMALLINT",
502525
"SNAPPY",
@@ -505,6 +528,7 @@
505528
"STAGE",
506529
"SYNTAX",
507530
"USAGE",
531+
"USE_RAW_PATH",
508532
"UPDATE",
509533
"UPLOAD",
510534
"SEQUENCE",
@@ -534,6 +558,7 @@
534558
"TENANTS",
535559
"TENANT",
536560
"THEN",
561+
"THURSDAY",
537562
"TIMESTAMP",
538563
"TIMEZONE_HOUR",
539564
"TIMEZONE_MINUTE",
@@ -548,6 +573,7 @@
548573
"TRUNCATE",
549574
"TRY_CAST",
550575
"TSV",
576+
"TUESDAY",
551577
"TUPLE",
552578
"TYPE",
553579
"UNBOUNDED",
@@ -567,11 +593,12 @@
567593
"USING",
568594
"VACUUM",
569595
"VALUES",
570-
"VALIDATION_MODE",
571596
"VARBINARY",
572597
"VARCHAR",
573598
"VARIANT",
599+
"VARIABLE",
574600
"VERBOSE",
601+
"GRAPHICAL",
575602
"VIEW",
576603
"VIEWS",
577604
"VIRTUAL",
@@ -605,6 +632,7 @@
605632
"UDF",
606633
"HANDLER",
607634
"LANGUAGE",
635+
"STATE",
608636
"TASK",
609637
"TASKS",
610638
"TOP",
@@ -620,6 +648,7 @@
620648
"INTEGRATION",
621649
"ENABLED",
622650
"WEBHOOK",
651+
"WEDNESDAY",
623652
"ERROR_INTEGRATION",
624653
"AUTO_INGEST",
625654
"PIPE_EXECUTION_PAUSED",
@@ -632,8 +661,21 @@
632661
"ABORT",
633662
"ROLLBACK",
634663
"TEMPORARY",
664+
"TEMP",
635665
"SECONDS",
636666
"DAYS",
667+
"DICTIONARY",
668+
"DICTIONARIES",
669+
"PRIMARY",
670+
"SOURCE",
671+
"SQL",
672+
"SUNDAY",
673+
"WAREHOUSES",
674+
"INSPECT",
675+
"ASSIGN",
676+
"NODES",
677+
"UNASSIGN",
678+
"ONLINE",
637679
}
638680

639681

@@ -998,7 +1040,8 @@ def visit_copy_into(self, copy_into, **kw):
9981040

9991041
result = f"COPY INTO {target}" f" FROM {source}"
10001042
if hasattr(copy_into, "files") and isinstance(copy_into.files, list):
1001-
result += f"FILES = {', '.join([f for f in copy_into.files])}"
1043+
quoted_files = [f"'{f}'" for f in copy_into.files]
1044+
result += f" FILES = ({', '.join(quoted_files)})"
10021045
if hasattr(copy_into, "pattern") and copy_into.pattern:
10031046
result += f" PATTERN = '{copy_into.pattern}'"
10041047
if not isinstance(copy_into.file_format, NoneType):

databend_sqlalchemy/dml.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ class Compression(Enum):
250250
DEFLATE = "DEFLATE"
251251
RAW_DEFLATE = "RAW_DEFLATE"
252252
XZ = "XZ"
253+
SNAPPY = "SNAPPY"
253254

254255

255256
class CopyFormat(ClauseElement):
@@ -401,6 +402,30 @@ def __init__(
401402
class ParquetFormat(CopyFormat):
402403
format_type = "PARQUET"
403404

405+
def __init__(
406+
self,
407+
*,
408+
missing_field_as: str = None,
409+
compression: Compression = None,
410+
):
411+
super().__init__()
412+
if missing_field_as:
413+
if missing_field_as not in ["ERROR", "FIELD_DEFAULT"]:
414+
raise TypeError(
415+
'Missing Field As should be "ERROR" or "FIELD_DEFAULT".'
416+
)
417+
self.options["MISSING_FIELD_AS"] = f"{missing_field_as}"
418+
if compression:
419+
if compression not in [Compression.ZSTD, Compression.SNAPPY]:
420+
raise TypeError(
421+
'Compression should be None, ZStd, or Snappy.'
422+
)
423+
self.options["COMPRESSION"] = compression.value
424+
425+
426+
class AVROFormat(CopyFormat):
427+
format_type = "AVRO"
428+
404429
def __init__(
405430
self,
406431
*,
@@ -418,6 +443,19 @@ def __init__(
418443
class ORCFormat(CopyFormat):
419444
format_type = "ORC"
420445

446+
def __init__(
447+
self,
448+
*,
449+
missing_field_as: str = None,
450+
):
451+
super().__init__()
452+
if missing_field_as:
453+
if missing_field_as not in ["ERROR", "FIELD_DEFAULT"]:
454+
raise TypeError(
455+
'Missing Field As should be "ERROR" or "FIELD_DEFAULT".'
456+
)
457+
self.options["MISSING_FIELD_AS"] = f"{missing_field_as}"
458+
421459

422460
class StageClause(ClauseElement, FromClauseRole):
423461
"""Stage Clause"""

tests/test_copy_into.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,38 @@ def test_copy_into_table_sub_select_column_clauses(self):
162162
checkparams={"1_1": "xyz", "IF_1": "NULL", "IF_2": "NOTNULL"},
163163
)
164164

165+
def test_copy_into_table_files(self):
166+
m = MetaData()
167+
tbl = Table(
168+
"atable",
169+
m,
170+
Column("id", Integer),
171+
schema="test_schema",
172+
)
173+
174+
copy_into = CopyIntoTable(
175+
target=tbl,
176+
from_=GoogleCloudStorage(
177+
uri="gcs://some-bucket/a/path/to/files",
178+
credentials="XYZ",
179+
),
180+
files=['one','two','three'],
181+
file_format=CSVFormat(),
182+
)
183+
184+
self.assert_compile(
185+
copy_into,
186+
(
187+
"COPY INTO test_schema.atable"
188+
" FROM 'gcs://some-bucket/a/path/to/files' "
189+
"CONNECTION = ("
190+
" ENDPOINT_URL = 'https://storage.googleapis.com' "
191+
" CREDENTIAL = 'XYZ' "
192+
") FILES = ('one', 'two', 'three')"
193+
" FILE_FORMAT = (TYPE = CSV)"
194+
),
195+
)
196+
165197

166198
class CopyIntoResultTest(fixtures.TablesTest):
167199
run_create_tables = "each"
@@ -204,7 +236,7 @@ def test_copy_into_stage_and_table(self, connection):
204236
eq_(r.rowcount, 1000)
205237
copy_into_results = r.context.copy_into_location_results()
206238
eq_(copy_into_results['rows_unloaded'], 1000)
207-
eq_(copy_into_results['input_bytes'], 16250)
239+
# eq_(copy_into_results['input_bytes'], 16250) # input bytes will differ, the table is random
208240
# eq_(copy_into_results['output_bytes'], 4701) # output bytes differs
209241

210242
# now copy into table

0 commit comments

Comments
 (0)