Skip to content

Commit 19bf7b7

Browse files
authored
fast import: basic python test (#10271)
We did not have any tests on fast_import binary yet. In this PR I have introduced: - `FastImport` class and tools for testing in python - basic test that runs fast import against vanilla postgres and checks that data is there Should be merged after #10251
1 parent 7e4a39e commit 19bf7b7

File tree

4 files changed

+165
-3
lines changed

4 files changed

+165
-3
lines changed

compute_tools/src/bin/fast_import.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ struct Args {
5858
pg_bin_dir: Utf8PathBuf,
5959
#[clap(long)]
6060
pg_lib_dir: Utf8PathBuf,
61+
#[clap(long)]
62+
pg_port: Option<u16>, // port to run postgres on, 5432 is default
6163
}
6264

6365
#[serde_with::serde_as]
@@ -74,6 +76,13 @@ enum EncryptionSecret {
7476
KMS { key_id: String },
7577
}
7678

79+
// copied from pageserver_api::config::defaults::DEFAULT_LOCALE to avoid dependency just for a constant
80+
const DEFAULT_LOCALE: &str = if cfg!(target_os = "macos") {
81+
"C"
82+
} else {
83+
"C.UTF-8"
84+
};
85+
7786
#[tokio::main]
7887
pub(crate) async fn main() -> anyhow::Result<()> {
7988
utils::logging::init(
@@ -97,6 +106,10 @@ pub(crate) async fn main() -> anyhow::Result<()> {
97106
let working_directory = args.working_directory;
98107
let pg_bin_dir = args.pg_bin_dir;
99108
let pg_lib_dir = args.pg_lib_dir;
109+
let pg_port = args.pg_port.unwrap_or_else(|| {
110+
info!("pg_port not specified, using default 5432");
111+
5432
112+
});
100113

101114
// Initialize AWS clients only if s3_prefix is specified
102115
let (aws_config, kms_client) = if args.s3_prefix.is_some() {
@@ -180,7 +193,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
180193
let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
181194
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
182195
superuser,
183-
locale: "en_US.UTF-8", // XXX: this shouldn't be hard-coded,
196+
locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
184197
pg_version,
185198
initdb_bin: pg_bin_dir.join("initdb").as_ref(),
186199
library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
@@ -197,6 +210,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
197210
let mut postgres_proc = tokio::process::Command::new(pgbin)
198211
.arg("-D")
199212
.arg(&pgdata_dir)
213+
.args(["-p", &format!("{pg_port}")])
200214
.args(["-c", "wal_level=minimal"])
201215
.args(["-c", "shared_buffers=10GB"])
202216
.args(["-c", "max_wal_senders=0"])
@@ -216,6 +230,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
216230
),
217231
])
218232
.env_clear()
233+
.env("LD_LIBRARY_PATH", &pg_lib_dir)
219234
.stdout(std::process::Stdio::piped())
220235
.stderr(std::process::Stdio::piped())
221236
.spawn()
@@ -232,7 +247,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
232247

233248
// Create neondb database in the running postgres
234249
let restore_pg_connstring =
235-
format!("host=localhost port=5432 user={superuser} dbname=postgres");
250+
format!("host=localhost port={pg_port} user={superuser} dbname=postgres");
236251

237252
let start_time = std::time::Instant::now();
238253

@@ -314,6 +329,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
314329
.arg(&source_connection_string)
315330
// how we run it
316331
.env_clear()
332+
.env("LD_LIBRARY_PATH", &pg_lib_dir)
317333
.kill_on_drop(true)
318334
.stdout(std::process::Stdio::piped())
319335
.stderr(std::process::Stdio::piped())
@@ -347,6 +363,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
347363
.arg(&dumpdir)
348364
// how we run it
349365
.env_clear()
366+
.env("LD_LIBRARY_PATH", &pg_lib_dir)
350367
.kill_on_drop(true)
351368
.stdout(std::process::Stdio::piped())
352369
.stderr(std::process::Stdio::piped())

test_runner/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@
1515
"fixtures.compare_fixtures",
1616
"fixtures.slow",
1717
"fixtures.reruns",
18+
"fixtures.fast_import",
1819
)

test_runner/fixtures/fast_import.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import os
2+
import shutil
3+
import subprocess
4+
import tempfile
5+
from collections.abc import Iterator
6+
from pathlib import Path
7+
8+
import pytest
9+
10+
from fixtures.log_helper import log
11+
from fixtures.neon_cli import AbstractNeonCli
12+
from fixtures.pg_version import PgVersion
13+
14+
15+
class FastImport(AbstractNeonCli):
16+
COMMAND = "fast_import"
17+
cmd: subprocess.CompletedProcess[str] | None = None
18+
19+
def __init__(
20+
self,
21+
extra_env: dict[str, str] | None,
22+
binpath: Path,
23+
pg_distrib_dir: Path,
24+
pg_version: PgVersion,
25+
workdir: Path,
26+
):
27+
if extra_env is None:
28+
env_vars = {}
29+
else:
30+
env_vars = extra_env.copy()
31+
32+
if not (binpath / self.COMMAND).exists():
33+
raise Exception(f"{self.COMMAND} binary not found at '{binpath}'")
34+
super().__init__(env_vars, binpath)
35+
36+
pg_dir = pg_distrib_dir / pg_version.v_prefixed
37+
self.pg_distrib_dir = pg_distrib_dir
38+
self.pg_version = pg_version
39+
self.pg_bin = pg_dir / "bin"
40+
if not (self.pg_bin / "postgres").exists():
41+
raise Exception(f"postgres binary was not found at '{self.pg_bin}'")
42+
self.pg_lib = pg_dir / "lib"
43+
if env_vars.get("LD_LIBRARY_PATH") is not None:
44+
self.pg_lib = Path(env_vars["LD_LIBRARY_PATH"])
45+
elif os.getenv("LD_LIBRARY_PATH") is not None:
46+
self.pg_lib = Path(str(os.getenv("LD_LIBRARY_PATH")))
47+
if not workdir.exists():
48+
raise Exception(f"Working directory '{workdir}' does not exist")
49+
self.workdir = workdir
50+
51+
def run(
52+
self,
53+
pg_port: int,
54+
source_connection_string: str | None = None,
55+
s3prefix: str | None = None,
56+
interactive: bool = False,
57+
) -> subprocess.CompletedProcess[str]:
58+
if self.cmd is not None:
59+
raise Exception("Command already executed")
60+
args = [
61+
f"--pg-bin-dir={self.pg_bin}",
62+
f"--pg-lib-dir={self.pg_lib}",
63+
f"--pg-port={pg_port}",
64+
f"--working-directory={self.workdir}",
65+
]
66+
if source_connection_string is not None:
67+
args.append(f"--source-connection-string={source_connection_string}")
68+
if s3prefix is not None:
69+
args.append(f"--s3-prefix={s3prefix}")
70+
if interactive:
71+
args.append("--interactive")
72+
73+
self.cmd = self.raw_cli(args)
74+
return self.cmd
75+
76+
def __enter__(self):
77+
return self
78+
79+
def __exit__(self, *args):
80+
if self.workdir.exists():
81+
shutil.rmtree(self.workdir)
82+
83+
84+
@pytest.fixture(scope="function")
85+
def fast_import(
86+
pg_version: PgVersion,
87+
test_output_dir: Path,
88+
neon_binpath: Path,
89+
pg_distrib_dir: Path,
90+
) -> Iterator[FastImport]:
91+
workdir = Path(tempfile.mkdtemp())
92+
with FastImport(None, neon_binpath, pg_distrib_dir, pg_version, workdir) as fi:
93+
yield fi
94+
95+
if fi.cmd is None:
96+
return
97+
98+
# dump stdout & stderr into test log dir
99+
with open(test_output_dir / "fast_import.stdout", "w") as f:
100+
f.write(fi.cmd.stdout)
101+
with open(test_output_dir / "fast_import.stderr", "w") as f:
102+
f.write(fi.cmd.stderr)
103+
104+
log.info("Written logs to %s", test_output_dir)

test_runner/regress/test_import_pgdata.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
import psycopg2.errors
88
import pytest
99
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
10+
from fixtures.fast_import import FastImport
1011
from fixtures.log_helper import log
11-
from fixtures.neon_fixtures import NeonEnvBuilder, VanillaPostgres
12+
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
1213
from fixtures.pageserver.http import (
1314
ImportPgdataIdemptencyKey,
1415
PageserverApiException,
1516
)
1617
from fixtures.pg_version import PgVersion
18+
from fixtures.port_distributor import PortDistributor
1719
from fixtures.remote_storage import RemoteStorageKind
1820
from fixtures.utils import run_only_on_postgres
1921
from pytest_httpserver import HTTPServer
@@ -313,3 +315,41 @@ def validate_vanilla_equivalence(ep):
313315
validate_vanilla_equivalence(br_initdb_endpoint)
314316
with pytest.raises(psycopg2.errors.UndefinedTable):
315317
br_initdb_endpoint.safe_psql("select * from othertable")
318+
319+
320+
@run_only_on_postgres(
321+
[PgVersion.V14, PgVersion.V15, PgVersion.V16],
322+
"newer control file catalog version and struct format isn't supported",
323+
)
324+
def test_fast_import_binary(
325+
test_output_dir,
326+
vanilla_pg: VanillaPostgres,
327+
port_distributor: PortDistributor,
328+
fast_import: FastImport,
329+
):
330+
vanilla_pg.start()
331+
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
332+
333+
pg_port = port_distributor.get_port()
334+
fast_import.run(pg_port, vanilla_pg.connstr())
335+
vanilla_pg.stop()
336+
337+
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
338+
with VanillaPostgres(
339+
fast_import.workdir / "pgdata", pgbin, pg_port, False
340+
) as new_pgdata_vanilla_pg:
341+
new_pgdata_vanilla_pg.start()
342+
343+
# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres
344+
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
345+
res = conn.safe_psql("SELECT count(*) FROM foo;")
346+
log.info(f"Result: {res}")
347+
assert res[0][0] == 10
348+
349+
350+
# TODO: Maybe test with pageserver?
351+
# 1. run whole neon env
352+
# 2. create timeline with some s3 path???
353+
# 3. run fast_import with s3 prefix
354+
# 4. ??? mock http where pageserver will report progress
355+
# 5. run compute on this timeline and check if data is there

0 commit comments

Comments
 (0)