forked from snowflakedb/snowflake-connector-python
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpandas_tools.py
161 lines (141 loc) · 7.36 KB
/
pandas_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import os
import string
import random
from tempfile import TemporaryDirectory
from typing import Optional, Sequence, TypeVar, Iterator, Tuple, Union, Iterable
from snowflake.connector import ProgrammingError
from snowflake.connector.options import pandas
MYPY = False
if MYPY: # from typing import TYPE_CHECKING once 3.5 is deprecated
from .connection import SnowflakeConnection
try:
import sqlalchemy
except ImportError:
sqlalchemy = None
T = TypeVar('T', bound=Sequence)
def chunk_helper(lst: T, n: int) -> Iterator[Tuple[int, T]]:
"""Helper generator to chunk a sequence efficiently with current index like if enumerate was called on sequence."""
for i in range(0, len(lst), n):
yield int(i / n), lst[i:i + n]
def write_pandas(conn: 'SnowflakeConnection',
df: 'pandas.DataFrame',
table_name: str,
database: Optional[str] = None,
schema: Optional[str] = None,
chunk_size: Optional[int] = None,
compression: str = 'gzip',
on_error: str = 'abort_statement',
parallel: int = 4
) -> Tuple[bool, int, int,
Sequence[Tuple[str, str, int, int, int, int, Optional[str], Optional[int],
Optional[int], Optional[str]]]]:
"""Allows users to most efficiently write back a pandas DataFrame to Snowflake.
It works by dumping the DataFrame into Parquet files, uploading them and finally copying their data into the table.
Returns whether all files were ingested correctly, number of chunks uploaded, and number of rows ingested
with all of the COPY INTO command's output for debugging purposes.
Example usage:
import pandas
from snowflake.connector.pandas_tools import write_pandas
df = pandas.DataFrame([('Mark', 10), ('Luke', 20)], columns=['name', 'balance'])
success, nchunks, nrows, _ = write_pandas(cnx, df, 'customers')
Args:
conn: Connection to be used to communicate with Snowflake.
df: Dataframe we'd like to write back.
table_name: Table name where we want to insert into.
database: Database schema and table is in, if not provided the default one will be used (Default value = None).
schema: Schema table is in, if not provided the default one will be used (Default value = None).
chunk_size: Number of elements to be inserted once, if not provided all elements will be dumped once
(Default value = None).
compression: The compression used on the Parquet files, can only be gzip, or snappy. Gzip gives supposedly a
better compression, while snappy is faster. Use whichever is more appropriate (Default value = 'gzip').
on_error: Action to take when COPY INTO statements fail, default follows documentation at:
https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#copy-options-copyoptions
(Default value = 'abort_statement').
parallel: Number of threads to be used when uploading chunks, default follows documentation at:
https://docs.snowflake.com/en/sql-reference/sql/put.html#optional-parameters (Default value = 4).
Returns:
Returns the COPY INTO command's results to verify ingestion in the form of a tuple of whether all chunks were
ingested correctly, # of chunks, # of ingested rows, and ingest's output.
"""
if database is not None and schema is None:
raise ProgrammingError("Schema has to be provided to write_pandas when a database is provided")
# This dictionary maps the compression algorithm to Snowflake put copy into command type
# https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#type-parquet
compression_map = {
'gzip': 'auto',
'snappy': 'snappy'
}
if compression not in compression_map.keys():
raise ProgrammingError("Invalid compression '{}', only acceptable values are: {}".format(
compression,
compression_map.keys()
))
location = ((('"' + database + '".') if database else '') +
(('"' + schema + '".') if schema else '') +
('"' + table_name + '"'))
if chunk_size is None:
chunk_size = len(df)
cursor = conn.cursor()
stage_name = None # Forward declaration
while True:
try:
stage_name = ''.join(random.choice(string.ascii_lowercase) for _ in range(5))
cursor.execute('create temporary stage /* Python:snowflake.connector.pandas_tools.write_pandas() */ '
'"{stage_name}"'.format(stage_name=stage_name), _is_internal=True).fetchall()
break
except ProgrammingError as pe:
if pe.msg.endswith('already exists.'):
continue
raise
with TemporaryDirectory() as tmp_folder:
for i, chunk in chunk_helper(df, chunk_size):
chunk_path = os.path.join(tmp_folder, 'file{}.txt'.format(i))
# Dump chunk into parquet file
chunk.to_parquet(chunk_path, compression=compression)
# Upload parquet file
cursor.execute('PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ '
'file://{path} @"{stage_name}" PARALLEL={parallel}'.format(
path=chunk_path,
stage_name=stage_name,
parallel=parallel
), _is_internal=True)
# Remove chunk file
os.remove(chunk_path)
copy_results = cursor.execute((
'COPY INTO {location} /* Python:snowflake.connector.pandas_tools.write_pandas() */ '
'FROM @"{stage_name}" FILE_FORMAT=(TYPE=PARQUET COMPRESSION={compression}) '
'MATCH_BY_COLUMN_NAME=CASE_SENSITIVE PURGE=TRUE ON_ERROR={on_error}'
).format(
location=location,
stage_name=stage_name,
compression=compression_map[compression],
on_error=on_error
), _is_internal=True).fetchall()
cursor.close()
return (all((e[1] == 'LOADED' for e in copy_results)),
len(copy_results),
sum((e[3] for e in copy_results)),
copy_results)
def pd_writer(table: pandas.io.sql.SQLTable,
conn: Union['sqlalchemy.engine.Engine', 'sqlalchemy.engine.Connection'],
keys: Iterable,
data_iter: Iterable) -> None:
"""This is a wrapper on top of write_pandas to make it compatible with to_sql method in pandas.
Example usage:
import pandas as pd
from snowflake.connector.pandas_tools import pd_writer
sf_connector_version_df = pd.DataFrame([('snowflake-connector-python', '1.0')], columns=['NAME', 'NEWEST_VERSION'])
sf_connector_version_df.to_sql('driver_versions', engine, index=False, method=pd_writer)
Args:
table: Pandas package's table object.
conn: SQLAlchemy engine object to talk to Snowflake.
keys: Column names that we are trying to insert.
data_iter: Iterator over the rows.
"""
sf_connection = conn.connection.connection
df = pandas.DataFrame(data_iter, columns=keys)
write_pandas(conn=sf_connection,
df=df,
# Note: Our sqlalchemy connector creates tables case insensitively
table_name=table.name.upper(),
schema=table.schema)