Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit b83a72a

Browse files
authored
Merge pull request #143 from datafold/config
Specify data-diff arguments using config files
2 parents 68a9d3c + f50bd64 commit b83a72a

File tree

14 files changed

+324
-79
lines changed

14 files changed

+324
-79
lines changed

README.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,57 @@ Options:
201201
- `--bisection-factor` - Segments per iteration. When set to 2, it performs binary search.
202202
- `--bisection-threshold` - Minimal bisection threshold. i.e. maximum size of pages to diff locally.
203203
- `-j` or `--threads` - Number of worker threads to use per database. Default=1.
204+
- `--conf`, `--run` - Specify the run and configuration from a TOML file. (see below)
205+
206+
### How to use with a configuration file
207+
208+
Data-diff lets you load the configuration for a run from a TOML file.
209+
210+
Reasons to use a configuration file:
211+
212+
- Convenience - Set-up the parameters for diffs that need to run often
213+
214+
- Easier and more readable - you can define the database connection settings as config values, instead of in a URI.
215+
216+
- Gives you fine-grained control over the settings switches, without requiring any Python code.
217+
218+
Use `--conf` to specify that path to the configuration file. data-diff will load the settings from `run.default`, if it's defined.
219+
220+
Then you can, optionally, use `--run` to choose to load the settings of a specific run, and override the settings `run.default`. (all runs extend `run.default`, like inheritance).
221+
222+
Finally, CLI switches have the final say, and will override the settings defined by the configuration file, and the current run.
223+
224+
Example TOML file:
225+
226+
```toml
227+
# Specify the connection params to the test database.
228+
[database.test_postgresql]
229+
driver = "postgresql"
230+
user = "postgres"
231+
password = "Password1"
232+
233+
# Specify the default run params
234+
[run.default]
235+
update_column = "timestamp"
236+
verbose = true
237+
238+
# Specify params for a run 'test_diff'.
239+
[run.test_diff]
240+
verbose = false
241+
# Source 1 ("left")
242+
1.database = "test_postgresql" # Use options from database.test_postgresql
243+
1.table = "rating"
244+
# Source 2 ("right")
245+
2.database = "postgresql://postgres:Password1@/" # Use URI like in the CLI
246+
2.table = "rating_del1"
247+
```
248+
249+
In this example, running `data-diff --conf myconfig.toml --run test_diff` will compare between `rating` and `rating_del1`.
250+
It will use the `timestamp` column as the update column, as specified in `run.default`. However, it won't be verbose, since that
251+
flag is overwritten to `false`.
252+
253+
Running it with `data-diff --conf myconfig.toml --run test_diff -v` will set verbose back to `true`.
254+
204255

205256
## How to use from Python
206257

data_diff/__main__.py

Lines changed: 80 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from copy import deepcopy
12
import sys
23
import time
34
import json
@@ -10,8 +11,9 @@
1011
DEFAULT_BISECTION_THRESHOLD,
1112
DEFAULT_BISECTION_FACTOR,
1213
)
13-
from .databases.connect import connect_to_uri
14+
from .databases.connect import connect
1415
from .parse_time import parse_time_before_now, UNITS_STR, ParseError
16+
from .config import apply_config_from_file
1517

1618
import rich
1719
import click
@@ -24,21 +26,28 @@
2426
"-": "red",
2527
}
2628

29+
def _remove_passwords_in_dict(d: dict):
30+
for k, v in d.items():
31+
if k == 'password':
32+
d[k] = '*' * len(v)
33+
elif isinstance(v, dict):
34+
_remove_passwords_in_dict(v)
35+
2736

2837
@click.command()
29-
@click.argument("db1_uri")
30-
@click.argument("table1_name")
31-
@click.argument("db2_uri")
32-
@click.argument("table2_name")
33-
@click.option("-k", "--key-column", default="id", help="Name of primary key column")
38+
@click.argument("database1", required=False)
39+
@click.argument("table1", required=False)
40+
@click.argument("database2", required=False)
41+
@click.argument("table2", required=False)
42+
@click.option("-k", "--key-column", default=None, help="Name of primary key column. Default='id'.")
3443
@click.option("-t", "--update-column", default=None, help="Name of updated_at/last_updated column")
3544
@click.option("-c", "--columns", default=[], multiple=True, help="Names of extra columns to compare")
3645
@click.option("-l", "--limit", default=None, help="Maximum number of differences to find")
37-
@click.option("--bisection-factor", default=DEFAULT_BISECTION_FACTOR, help="Segments per iteration")
46+
@click.option("--bisection-factor", default=None, help=f"Segments per iteration. Default={DEFAULT_BISECTION_FACTOR}.")
3847
@click.option(
3948
"--bisection-threshold",
40-
default=DEFAULT_BISECTION_THRESHOLD,
41-
help="Minimal bisection threshold. Below it, data-diff will download the data and compare it locally.",
49+
default=None,
50+
help=f"Minimal bisection threshold. Below it, data-diff will download the data and compare it locally. Default={DEFAULT_BISECTION_THRESHOLD}.",
4251
)
4352
@click.option(
4453
"--min-age",
@@ -57,16 +66,32 @@
5766
@click.option(
5867
"-j",
5968
"--threads",
60-
default="1",
69+
default=None,
6170
help="Number of worker threads to use per database. Default=1. "
6271
"A higher number will increase performance, but take more capacity from your database. "
6372
"'serial' guarantees a single-threaded execution of the algorithm (useful for debugging).",
6473
)
65-
def main(
66-
db1_uri,
67-
table1_name,
68-
db2_uri,
69-
table2_name,
74+
@click.option(
75+
"--conf",
76+
default=None,
77+
help="Path to a configuration.toml file, to provide a default configuration, and a list of possible runs.",
78+
)
79+
@click.option(
80+
"--run",
81+
default=None,
82+
help="Name of run-configuration to run. If used, CLI arguments for database and table must be omitted.",
83+
)
84+
def main(conf, run, **kw):
85+
if conf:
86+
kw = apply_config_from_file(conf, run, kw)
87+
return _main(**kw)
88+
89+
90+
def _main(
91+
database1,
92+
table1,
93+
database2,
94+
table2,
7095
key_column,
7196
update_column,
7297
columns,
@@ -82,35 +107,53 @@ def main(
82107
threads,
83108
keep_column_case,
84109
json_output,
110+
threads1=None,
111+
threads2=None,
112+
__conf__=None,
85113
):
86-
if limit and stats:
87-
print("Error: cannot specify a limit when using the -s/--stats switch")
88-
return
114+
89115
if interactive:
90116
debug = True
91117

92118
if debug:
93119
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT)
120+
# XXX Temporarily commented out, until we remove the passwords from URIs as well. See issue #150.
121+
# if __conf__:
122+
# __conf__ = deepcopy(__conf__)
123+
# _remove_passwords_in_dict(__conf__)
124+
# logging.debug(f"Applied run configuration: {__conf__}")
94125
elif verbose:
95126
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT)
96127

128+
if limit and stats:
129+
logging.error("Cannot specify a limit when using the -s/--stats switch")
130+
return
131+
132+
key_column = key_column or "id"
133+
if bisection_factor is None:
134+
bisection_factor = DEFAULT_BISECTION_FACTOR
135+
if bisection_threshold is None:
136+
bisection_threshold = DEFAULT_BISECTION_THRESHOLD
137+
97138
threaded = True
98-
if threads is not None:
99-
if threads.lower() == "serial":
100-
threaded = False
101-
threads = 1
102-
else:
103-
try:
104-
threads = int(threads)
105-
except ValueError:
106-
logging.error("Error: threads must be a number, 'auto', or 'serial'.")
107-
return
108-
if threads < 1:
109-
logging.error("Error: threads must be >= 1")
110-
return
111-
112-
db1 = connect_to_uri(db1_uri, threads)
113-
db2 = connect_to_uri(db2_uri, threads)
139+
if threads is None:
140+
threads = 1
141+
elif isinstance(threads, str) and threads.lower() == "serial":
142+
assert not (threads1 or threads2)
143+
threaded = False
144+
threads = 1
145+
else:
146+
try:
147+
threads = int(threads)
148+
except ValueError:
149+
logging.error("Error: threads must be a number, or 'serial'.")
150+
return
151+
if threads < 1:
152+
logging.error("Error: threads must be >= 1")
153+
return
154+
155+
db1 = connect(database1, threads1 or threads)
156+
db2 = connect(database2, threads2 or threads)
114157

115158
if interactive:
116159
db1.enable_interactive()
@@ -128,8 +171,8 @@ def main(
128171
logging.error("Error while parsing age expression: %s" % e)
129172
return
130173

131-
table1 = TableSegment(db1, db1.parse_table_name(table1_name), key_column, update_column, columns, **options)
132-
table2 = TableSegment(db2, db2.parse_table_name(table2_name), key_column, update_column, columns, **options)
174+
table1_seg = TableSegment(db1, db1.parse_table_name(table1), key_column, update_column, columns, **options)
175+
table2_seg = TableSegment(db2, db2.parse_table_name(table2), key_column, update_column, columns, **options)
133176

134177
differ = TableDiffer(
135178
bisection_factor=bisection_factor,
@@ -138,7 +181,7 @@ def main(
138181
max_threadpool_size=threads and threads * 2,
139182
debug=debug,
140183
)
141-
diff_iter = differ.diff_tables(table1, table2)
184+
diff_iter = differ.diff_tables(table1_seg, table2_seg)
142185

143186
if limit:
144187
diff_iter = islice(diff_iter, int(limit))

data_diff/config.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from typing import Any, Dict
2+
import toml
3+
4+
5+
class ConfigParseError(Exception):
6+
pass
7+
8+
9+
def is_uri(s: str) -> bool:
10+
return "://" in s
11+
12+
13+
def _apply_config(config: Dict[str, Any], run_name: str, kw: Dict[str, Any]):
14+
# Load config
15+
databases = config.pop("database", {})
16+
runs = config.pop("run", {})
17+
if config:
18+
raise ConfigParseError(f"Unknown option(s): {config}")
19+
20+
# Init run_args
21+
run_args = runs.get("default") or {}
22+
if run_name:
23+
if run_name not in runs:
24+
raise ConfigParseError(f"Cannot find run '{run_name}' in configuration.")
25+
run_args.update(runs[run_name])
26+
else:
27+
run_name = "default"
28+
29+
# Process databases + tables
30+
for index in "12":
31+
args = run_args.pop(index, {})
32+
for attr in ("database", "table"):
33+
if attr not in args:
34+
raise ConfigParseError(f"Running 'run.{run_name}': Connection #{index} in missing attribute '{attr}'.")
35+
36+
database = args.pop("database")
37+
table = args.pop("table")
38+
threads = args.pop("threads", None)
39+
if args:
40+
raise ConfigParseError(f"Unexpected attributes for connection #{index}: {args}")
41+
42+
if not is_uri(database):
43+
if database not in databases:
44+
raise ConfigParseError(
45+
f"Database '{database}' not found in list of databases. Available: {list(databases)}."
46+
)
47+
database = dict(databases[database])
48+
assert isinstance(database, dict)
49+
if "driver" not in database:
50+
raise ConfigParseError(f"Database '{database}' did not specify a driver.")
51+
52+
run_args[f"database{index}"] = database
53+
run_args[f"table{index}"] = table
54+
if threads is not None:
55+
run_args[f"threads{index}"] = int(threads)
56+
57+
# Update keywords
58+
new_kw = dict(kw) # Set defaults
59+
new_kw.update(run_args) # Apply config
60+
new_kw.update({k: v for k, v in kw.items() if v}) # Apply non-empty defaults
61+
62+
new_kw["__conf__"] = run_args
63+
64+
return new_kw
65+
66+
67+
def apply_config_from_file(path: str, run_name: str, kw: Dict[str, Any]):
68+
with open(path) as f:
69+
return _apply_config(toml.load(f), run_name, kw)
70+
71+
72+
def apply_config_from_string(toml_config: str, run_name: str, kw: Dict[str, Any]):
73+
return _apply_config(toml.loads(toml_config), run_name, kw)

data_diff/databases/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ def __init__(self, thread_count=1):
258258
self._init_error = None
259259
self._queue = ThreadPoolExecutor(thread_count, initializer=self.set_conn)
260260
self.thread_local = threading.local()
261+
logger.info(f"[{self.name}] Starting a threadpool, size={thread_count}.")
261262

262263
def set_conn(self):
263264
assert not hasattr(self.thread_local, "conn")

data_diff/databases/connect.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,46 @@ def connect_to_uri(db_uri: str, thread_count: Optional[int] = 1) -> Database:
116116
kw = matcher.match_path(dsn)
117117

118118
if scheme == "bigquery":
119-
return cls(dsn.host, **kw)
119+
kw["project"] = dsn.host
120+
return cls(**kw)
121+
122+
if scheme == "snowflake":
123+
kw["account"] = dsn.host
124+
assert not dsn.port
125+
kw["user"] = dsn.user
126+
kw["password"] = dsn.password
127+
else:
128+
kw["host"] = dsn.host
129+
kw["port"] = dsn.port
130+
kw["user"] = dsn.user
131+
if dsn.password:
132+
kw["password"] = dsn.password
133+
kw = {k: v for k, v in kw.items() if v is not None}
120134

121135
if issubclass(cls, ThreadedDatabase):
122-
return cls(dsn.host, dsn.port, dsn.user, dsn.password, thread_count=thread_count, **kw)
136+
return cls(thread_count=thread_count, **kw)
123137

124-
return cls(dsn.host, dsn.port, dsn.user, dsn.password, **kw)
138+
return cls(**kw)
139+
140+
141+
def connect_with_dict(d, thread_count):
142+
d = dict(d)
143+
driver = d.pop("driver")
144+
try:
145+
matcher = MATCH_URI_PATH[driver]
146+
except KeyError:
147+
raise NotImplementedError(f"Driver {driver} currently not supported")
148+
149+
cls = matcher.database_cls
150+
if issubclass(cls, ThreadedDatabase):
151+
return cls(thread_count=thread_count, **d)
152+
153+
return cls(**d)
154+
155+
156+
def connect(x, thread_count):
157+
if isinstance(x, str):
158+
return connect_to_uri(x, thread_count)
159+
elif isinstance(x, dict):
160+
return connect_with_dict(x, thread_count)
161+
raise RuntimeError(x)

data_diff/databases/database_types.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ class ColType:
1818
pass
1919

2020

21+
class IKey(ABC):
22+
"Interface for ColType, for using a column as a key in data-diff"
23+
python_type: type
24+
25+
2126
@dataclass
2227
class PrecisionType(ColType):
2328
precision: int

0 commit comments

Comments
 (0)