From 01c06e5493177e451a3a81fd1a956f0cd94b6c6e Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Tue, 31 Aug 2021 17:49:42 +0500 Subject: [PATCH 01/22] Added tests for --wal-file-path --- tests/archive.py | 120 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/tests/archive.py b/tests/archive.py index 0ade2d66a..4966ec710 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1801,6 +1801,126 @@ def test_archive_options_1(self): self.del_test_dir(module_name, fname) + # @unittest.skip("skip") + # @unittest.expectedFailure + def test_wal_file_path(self): + """ + check that archive-push works correct with undefined + --wal-file-path + """ + fname = self.id().split('.')[3] + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + node = self.make_simple_node( + base_dir=os.path.join(module_name, fname, 'node'), + set_replication=True, + initdb_params=['--data-checksums']) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + + # FULL + self.backup_node(backup_dir, 'node', node) + node.pgbench_init(scale=1) + + node.cleanup() + + if self.get_version(node) >= self.version_to_num('12.0'): + recovery_conf = os.path.join(node.data_dir, 'postgresql.auto.conf') + else: + recovery_conf = os.path.join(node.data_dir, 'recovery.conf') + + with open(recovery_conf, 'r') as f: + recovery_content = f.read() + + self.assertIn( + "restore_command = '\"{0}\" archive-push -B \"{1}\" --instance \"{2}\" " + "--wal-file-name=%f --remote-host=localhost " + "--remote-port=22 --remote-user={3}'".format( + self.probackup_path, backup_dir, 'node', self.user), + recovery_content) + + self.del_test_dir(module_name, fname) + + # @unittest.skip("skip") + # @unittest.expectedFailure + def test_wal_file_path_2(self): + """ + check that archive-push works correct with --wal-file-path=%p as usual + """ + fname = self.id().split('.')[3] + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + node = self.make_simple_node( + base_dir=os.path.join(module_name, fname, 'node'), + set_replication=True, + initdb_params=['--data-checksums']) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + + # FULL + self.backup_node(backup_dir, 'node', node) + node.pgbench_init(scale=1) + + node.cleanup() + + if self.get_version(node) >= self.version_to_num('12.0'): + recovery_conf = os.path.join(node.data_dir, 'postgresql.auto.conf') + else: + recovery_conf = os.path.join(node.data_dir, 'recovery.conf') + + with open(recovery_conf, 'r') as f: + recovery_content = f.read() + + self.assertIn( + "restore_command = '\"{0}\" archive-push -B \"{1}\" --instance \"{2}\" " + "--wal-file-path=%p --wal-file-name=%f --remote-host=localhost " + "--remote-port=22 --remote-user={3}'".format( + self.probackup_path, backup_dir, 'node', self.user), + recovery_content) + + self.del_test_dir(module_name, fname) + + # @unittest.skip("skip") + # @unittest.expectedFailure + def test_wal_file_path_3(self): + """ + check that archive-push works correct with --wal-file-path setting by user + """ + fname = self.id().split('.')[3] + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + node = self.make_simple_node( + base_dir=os.path.join(module_name, fname, 'node'), + set_replication=True, + initdb_params=['--data-checksums']) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + + # FULL + self.backup_node(backup_dir, 'node', node) + node.pgbench_init(scale=1) + + node.cleanup() + + if self.get_version(node) >= self.version_to_num('12.0'): + recovery_conf = os.path.join(node.data_dir, 'postgresql.auto.conf') + else: + recovery_conf = os.path.join(node.data_dir, 'recovery.conf') + + with open(recovery_conf, 'r') as f: + recovery_content = f.read() + + test_wal_filepath = self.probackup_path + "/test_walpath" + + self.assertIn( + "restore_command = '\"{0}\" archive-push -B \"{1}\" --instance \"{2}\" " + "--wal-file-path={3} --wal-file-name=%f --remote-host=localhost " + "--remote-port=22 --remote-user={4}'".format( + self.probackup_path, backup_dir, 'node', test_wal_filepath, self.user), + recovery_content) + + self.del_test_dir(module_name, fname) + # @unittest.skip("skip") # @unittest.expectedFailure def test_hexadecimal_timeline(self): From 2d1347455f21e2e21d91ff54863c52f4520e108b Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Wed, 1 Sep 2021 14:13:31 +0500 Subject: [PATCH 02/22] Reterned wal-file-path option --- src/archive.c | 621 ++++++++++++++++++++++++++++++++++----------- src/pg_probackup.h | 1 + 2 files changed, 477 insertions(+), 145 deletions(-) diff --git a/src/archive.c b/src/archive.c index 6ac1062b8..46e565b3a 100644 --- a/src/archive.c +++ b/src/archive.c @@ -21,6 +21,8 @@ static int push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_ const char *archive_dir, bool overwrite, bool no_sync, int compress_level, uint32 archive_timeout); #endif +static void push_wal_file(const char *from_path, const char *to_path, + bool is_compress, bool overwrite, int compress_level); static void *push_files(void *arg); static void *get_files(void *arg); static bool get_wal_file(const char *filename, const char *from_path, const char *to_path, @@ -30,10 +32,12 @@ static int get_wal_file_internal(const char *from_path, const char *to_path, FIL #ifdef HAVE_LIBZ static const char *get_gz_error(gzFile gzf, int errnum); #endif -//static void copy_file_attributes(const char *from_path, -// fio_location from_location, -// const char *to_path, fio_location to_location, -// bool unlink_on_error); +static bool fileEqualCRC(const char *path1, const char *path2, + bool path2_is_compressed); +static void copy_file_attributes(const char *from_path, + fio_location from_location, + const char *to_path, fio_location to_location, + bool unlink_on_error); static bool next_wal_segment_exists(TimeLineID tli, XLogSegNo segno, const char *prefetch_dir, uint32 wal_seg_size); static uint32 run_wal_prefetch(const char *prefetch_dir, const char *archive_dir, TimeLineID tli, @@ -142,7 +146,7 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path, int n_threads; if (wal_file_name == NULL) - elog(ERROR, "Required parameter is not specified: --wal-file-name %%f"); + elog(INFO, "Required parameter is not specified: --wal-file-name %%f"); if (!getcwd(current_dir, sizeof(current_dir))) elog(ERROR, "getcwd() error"); @@ -156,147 +160,189 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path, if (system_id != instance->system_identifier) elog(ERROR, "Refuse to push WAL segment %s into archive. Instance parameters mismatch." "Instance '%s' should have SYSTEM_ID = " UINT64_FORMAT " instead of " UINT64_FORMAT, - wal_file_name, instance->name, instance->system_identifier, system_id); + wal_file_name, instance->name, instance->system_identifier, system_id); if (instance->compress_alg == PGLZ_COMPRESS) elog(ERROR, "Cannot use pglz for WAL compression"); - join_path_components(pg_xlog_dir, current_dir, XLOGDIR); - join_path_components(archive_status_dir, pg_xlog_dir, "archive_status"); + /* */ + elog(INFO, "instance %s", instance->backup_instance_path); + elog(INFO, "wal_file_path %s", wal_file_path); + if ((instance->backup_instance_path == wal_file_path) || (wal_file_path == NULL)) + { + if(wal_file_path == NULL) + { + elog(INFO, "Required parameter is not specified: --wal-file-path. Set by default"); + wal_file_path = instance->backup_instance_path; + } - /* Create 'archlog_path' directory. Do nothing if it already exists. */ - //fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST); + join_path_components(pg_xlog_dir, current_dir, XLOGDIR); + join_path_components(archive_status_dir, pg_xlog_dir, "archive_status"); + + /* Create 'archlog_path' directory. Do nothing if it already exists. */ + //fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST); #ifdef HAVE_LIBZ - if (instance->compress_alg == ZLIB_COMPRESS) - is_compress = true; + if (instance->compress_alg == ZLIB_COMPRESS) + is_compress = true; #endif - /* Setup filelist and locks */ - batch_files = setup_push_filelist(archive_status_dir, wal_file_name, batch_size); - - n_threads = num_threads; - if (num_threads > parray_num(batch_files)) - n_threads = parray_num(batch_files); - - elog(INFO, "pg_probackup archive-push WAL file: %s, " - "threads: %i/%i, batch: %lu/%i, compression: %s", - wal_file_name, n_threads, num_threads, - parray_num(batch_files), batch_size, - is_compress ? "zlib" : "none"); - - num_threads = n_threads; - - /* Single-thread push - * We don`t want to start multi-thread push, if number of threads in equal to 1, - * or the number of files ready to push is small. - * Multithreading in remote mode isn`t cheap, - * establishing ssh connection can take 100-200ms, so running and terminating - * one thread using generic multithread approach can take - * almost as much time as copying itself. - * TODO: maybe we should be more conservative and force single thread - * push if batch_files array is small. - */ - if (num_threads == 1 || (parray_num(batch_files) == 1)) - { - INSTR_TIME_SET_CURRENT(start_time); - for (i = 0; i < parray_num(batch_files); i++) + /* Setup filelist and locks */ + batch_files = setup_push_filelist(archive_status_dir, wal_file_name, batch_size); + + n_threads = num_threads; + if (num_threads > parray_num(batch_files)) + n_threads = parray_num(batch_files); + + elog(INFO, "pg_probackup archive-push WAL file: %s, " + "threads: %i/%i, batch: %lu/%i, compression: %s", + wal_file_name, n_threads, num_threads, + parray_num(batch_files), batch_size, + is_compress ? "zlib" : "none"); + + num_threads = n_threads; + + /* Single-thread push + * We don`t want to start multi-thread push, if number of threads in equal to 1, + * or the number of files ready to push is small. + * Multithreading in remote mode isn`t cheap, + * establishing ssh connection can take 100-200ms, so running and terminating + * one thread using generic multithread approach can take + * almost as much time as copying itself. + * TODO: maybe we should be more conservative and force single thread + * push if batch_files array is small. + */ + if (num_threads == 1 || (parray_num(batch_files) == 1)) { - int rc; - WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i); + INSTR_TIME_SET_CURRENT(start_time); + for (i = 0; i < parray_num(batch_files); i++) + { + int rc; + WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i); + + rc = push_file(xlogfile, archive_status_dir, + pg_xlog_dir, instance->arclog_path, + overwrite, no_sync, + instance->archive_timeout, + no_ready_rename || (strcmp(xlogfile->name, wal_file_name) == 0) ? true : false, + is_compress && IsXLogFileName(xlogfile->name) ? true : false, + instance->compress_level); + if (rc == 0) + n_total_pushed++; + else + n_total_skipped++; + } - rc = push_file(xlogfile, archive_status_dir, - pg_xlog_dir, instance->arclog_path, - overwrite, no_sync, - instance->archive_timeout, - no_ready_rename || (strcmp(xlogfile->name, wal_file_name) == 0) ? true : false, - is_compress && IsXLogFileName(xlogfile->name) ? true : false, - instance->compress_level); - if (rc == 0) - n_total_pushed++; - else - n_total_skipped++; + push_isok = true; + goto push_done; } - push_isok = true; - goto push_done; - } + /* init thread args with its own segno */ + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + threads_args = (archive_push_arg *) palloc(sizeof(archive_push_arg) * num_threads); - /* init thread args with its own segno */ - threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); - threads_args = (archive_push_arg *) palloc(sizeof(archive_push_arg) * num_threads); + for (i = 0; i < num_threads; i++) + { + archive_push_arg *arg = &(threads_args[i]); - for (i = 0; i < num_threads; i++) - { - archive_push_arg *arg = &(threads_args[i]); - - arg->first_filename = wal_file_name; - arg->archive_dir = instance->arclog_path; - arg->pg_xlog_dir = pg_xlog_dir; - arg->archive_status_dir = archive_status_dir; - arg->overwrite = overwrite; - arg->compress = is_compress; - arg->no_sync = no_sync; - arg->no_ready_rename = no_ready_rename; - arg->archive_timeout = instance->archive_timeout; - - arg->compress_alg = instance->compress_alg; - arg->compress_level = instance->compress_level; - - arg->files = batch_files; - arg->n_pushed = 0; - arg->n_skipped = 0; - - arg->thread_num = i+1; - /* By default there are some error */ - arg->ret = 1; - } + arg->first_filename = wal_file_name; + arg->archive_dir = instance->arclog_path; + arg->pg_xlog_dir = pg_xlog_dir; + arg->archive_status_dir = archive_status_dir; + arg->overwrite = overwrite; + arg->compress = is_compress; + arg->no_sync = no_sync; + arg->no_ready_rename = no_ready_rename; + arg->archive_timeout = instance->archive_timeout; - /* Run threads */ - INSTR_TIME_SET_CURRENT(start_time); - for (i = 0; i < num_threads; i++) - { - archive_push_arg *arg = &(threads_args[i]); - pthread_create(&threads[i], NULL, push_files, arg); - } + arg->compress_alg = instance->compress_alg; + arg->compress_level = instance->compress_level; - /* Wait threads */ - for (i = 0; i < num_threads; i++) - { - pthread_join(threads[i], NULL); - if (threads_args[i].ret == 1) + arg->files = batch_files; + arg->n_pushed = 0; + arg->n_skipped = 0; + + arg->thread_num = i+1; + /* By default there are some error */ + arg->ret = 1; + } + + /* Run threads */ + INSTR_TIME_SET_CURRENT(start_time); + for (i = 0; i < num_threads; i++) { - push_isok = false; - n_total_failed++; + archive_push_arg *arg = &(threads_args[i]); + pthread_create(&threads[i], NULL, push_files, arg); } - n_total_pushed += threads_args[i].n_pushed; - n_total_skipped += threads_args[i].n_skipped; - } + /* Wait threads */ + for (i = 0; i < num_threads; i++) + { + pthread_join(threads[i], NULL); + if (threads_args[i].ret == 1) + { + push_isok = false; + n_total_failed++; + } - /* Note, that we are leaking memory here, - * because pushing into archive is a very - * time-sensetive operation, so we skip freeing stuff. - */ + n_total_pushed += threads_args[i].n_pushed; + n_total_skipped += threads_args[i].n_skipped; + } + + /* Note, that we are leaking memory here, + * because pushing into archive is a very + * time-sensetive operation, so we skip freeing stuff. + */ push_done: - fio_disconnect(); - /* calculate elapsed time */ - INSTR_TIME_SET_CURRENT(end_time); - INSTR_TIME_SUBTRACT(end_time, start_time); - push_time = INSTR_TIME_GET_DOUBLE(end_time); - pretty_time_interval(push_time, pretty_time_str, 20); - - if (push_isok) - /* report number of files pushed into archive */ - elog(INFO, "pg_probackup archive-push completed successfully, " - "pushed: %u, skipped: %u, time elapsed: %s", - n_total_pushed, n_total_skipped, pretty_time_str); + fio_disconnect(); + /* calculate elapsed time */ + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + push_time = INSTR_TIME_GET_DOUBLE(end_time); + pretty_time_interval(push_time, pretty_time_str, 20); + + if (push_isok) + /* report number of files pushed into archive */ + elog(INFO, "pg_probackup archive-push completed successfully, " + "pushed: %u, skipped: %u, time elapsed: %s", + n_total_pushed, n_total_skipped, pretty_time_str); + else + elog(ERROR, "pg_probackup archive-push failed, " + "pushed: %i, skipped: %u, failed: %u, time elapsed: %s", + n_total_pushed, n_total_skipped, n_total_failed, + pretty_time_str); + } else - elog(ERROR, "pg_probackup archive-push failed, " - "pushed: %i, skipped: %u, failed: %u, time elapsed: %s", - n_total_pushed, n_total_skipped, n_total_failed, - pretty_time_str); + { + /* If user passed a directory different from saved in pg_probackup.conf, + * work according to the scheme with a single copy of files + */ + char backup_wal_file_path[MAXPGPATH]; + char absolute_wal_file_path[MAXPGPATH]; + + /* Check that -j option is not set with --wal-file-path option and ignore it*/ + if (num_threads > 1) + elog(ERROR, "Option -j is not working with user defined --wal-file-path. Ignore"); + + join_path_components(absolute_wal_file_path, current_dir, wal_file_path); + join_path_components(backup_wal_file_path, instance->arclog_path, wal_file_name); + + elog(INFO, "pg_probackup archive-push from %s to %s", absolute_wal_file_path, backup_wal_file_path); + + if (instance->compress_alg == PGLZ_COMPRESS) + elog(ERROR, "pglz compression is not supported"); + +#ifdef HAVE_LIBZ + if (instance->compress_alg == ZLIB_COMPRESS) + is_compress = IsXLogFileName(wal_file_name); +#endif + + push_wal_file(absolute_wal_file_path, backup_wal_file_path, is_compress, + overwrite, instance->compress_level); + elog(INFO, "pg_probackup archive-push completed successfully"); + } + } /* ------------- INTERNAL FUNCTIONS ---------- */ @@ -888,6 +934,232 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, } #endif +/* + * Copy WAL segment from pgdata to archive catalog with possible compression. + */ +void +push_wal_file(const char *from_path, const char *to_path, bool is_compress, + bool overwrite, int compress_level) +{ + FILE *in = NULL; + int out = -1; + char buf[XLOG_BLCKSZ]; + const char *to_path_p; + char to_path_temp[MAXPGPATH]; + int errno_temp; + /* partial handling */ + struct stat st; + int partial_try_count = 0; + int partial_file_size = 0; + bool partial_file_exists = false; + +#ifdef HAVE_LIBZ + char gz_to_path[MAXPGPATH]; + gzFile gz_out = NULL; + + if (is_compress) + { + snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); + to_path_p = gz_to_path; + } + else +#endif + to_path_p = to_path; + + /* open file for read */ + in = fio_fopen(from_path, PG_BINARY_R, FIO_DB_HOST); + if (in == NULL) + elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path, + strerror(errno)); + + /* Check if possible to skip copying */ + if (fileExists(to_path_p, FIO_BACKUP_HOST)) + { + if (fileEqualCRC(from_path, to_path_p, is_compress)) + return; + /* Do not copy and do not rise error. Just quit as normal. */ + else if (!overwrite) + elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p); + } + + /* open backup file for write */ +#ifdef HAVE_LIBZ + if (is_compress) + { + snprintf(to_path_temp, sizeof(to_path_temp), "%s.part", gz_to_path); + + gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, compress_level, FIO_BACKUP_HOST); + if (gz_out == NULL) + { + partial_file_exists = true; + elog(WARNING, "Cannot open destination temporary WAL file \"%s\": %s", + to_path_temp, strerror(errno)); + } + } + else +#endif + { + snprintf(to_path_temp, sizeof(to_path_temp), "%s.part", to_path); + + out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST); + if (out < 0) + { + partial_file_exists = true; + elog(WARNING, "Cannot open destination temporary WAL file \"%s\": %s", + to_path_temp, strerror(errno)); + } + } + + /* Partial file is already exists, it could have happened due to failed archive-push, + * in this case partial file can be discarded, or due to concurrent archiving. + * + * Our main goal here is to try to handle partial file to prevent stalling of + * continious archiving. + * To ensure that ecncountered partial file is actually a stale "orphaned" file, + * check its size every second. + * If the size has not changed in PARTIAL_WAL_TIMER seconds, we can consider + * the file stale and reuse it. + * If file size is changing, it means that another archiver works at the same + * directory with the same files. Such partial files cannot be reused. + */ + if (partial_file_exists) + { + while (partial_try_count < PARTIAL_WAL_TIMER) + { + + if (fio_stat(to_path_temp, &st, false, FIO_BACKUP_HOST) < 0) + /* It is ok if partial is gone, we can safely error out */ + elog(ERROR, "Cannot stat destination temporary WAL file \"%s\": %s", to_path_temp, + strerror(errno)); + + /* first round */ + if (!partial_try_count) + partial_file_size = st.st_size; + + /* file size is changing */ + if (st.st_size > partial_file_size) + elog(ERROR, "Destination temporary WAL file \"%s\" is not stale", to_path_temp); + + sleep(1); + partial_try_count++; + } + + /* Partial segment is considered stale, so reuse it */ + elog(WARNING, "Reusing stale destination temporary WAL file \"%s\"", to_path_temp); + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + +#ifdef HAVE_LIBZ + if (is_compress) + { + gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, compress_level, FIO_BACKUP_HOST); + if (gz_out == NULL) + elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s", + to_path_temp, strerror(errno)); + } + else +#endif + { + out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST); + if (out < 0) + elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s", + to_path_temp, strerror(errno)); + } + } + + /* copy content */ + for (;;) + { + ssize_t read_len = 0; + + read_len = fio_fread(in, buf, sizeof(buf)); + + if (read_len < 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, + "Cannot read source WAL file \"%s\": %s", + from_path, strerror(errno_temp)); + } + + if (read_len > 0) + { +#ifdef HAVE_LIBZ + if (is_compress) + { + if (fio_gzwrite(gz_out, buf, read_len) != read_len) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot write to compressed WAL file \"%s\": %s", + to_path_temp, get_gz_error(gz_out, errno_temp)); + } + } + else +#endif + { + if (fio_write(out, buf, read_len) != read_len) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot write to WAL file \"%s\": %s", + to_path_temp, strerror(errno_temp)); + } + } + } + + if (read_len == 0) + break; + } + +#ifdef HAVE_LIBZ + if (is_compress) + { + if (fio_gzclose(gz_out) != 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", + to_path_temp, get_gz_error(gz_out, errno_temp)); + } + } + else +#endif + { + if (fio_flush(out) != 0 || fio_close(out) != 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot write WAL file \"%s\": %s", + to_path_temp, strerror(errno_temp)); + } + } + + if (fio_fclose(in)) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot close source WAL file \"%s\": %s", + from_path, strerror(errno_temp)); + } + + /* update file permission. */ + copy_file_attributes(from_path, FIO_DB_HOST, to_path_temp, FIO_BACKUP_HOST, true); + + if (fio_rename(to_path_temp, to_path_p, FIO_BACKUP_HOST) < 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s", + to_path_temp, to_path_p, strerror(errno_temp)); + } + +#ifdef HAVE_LIBZ + if (is_compress) + elog(INFO, "WAL file compressed to \"%s\"", gz_to_path); +#endif +} + #ifdef HAVE_LIBZ /* * Show error during work with compressed file @@ -906,30 +1178,89 @@ get_gz_error(gzFile gzf, int errnum) } #endif + +/* + * compare CRC of two WAL files. + * If necessary, decompress WAL file from path2 + */ +static bool +fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) +{ + pg_crc32 crc1; + pg_crc32 crc2; + + /* Get checksum of backup file */ +#ifdef HAVE_LIBZ + if (path2_is_compressed) + { + char buf [1024]; + gzFile gz_in = NULL; + + INIT_FILE_CRC32(true, crc2); + gz_in = fio_gzopen(path2, PG_BINARY_R, Z_DEFAULT_COMPRESSION, FIO_BACKUP_HOST); + if (gz_in == NULL) + /* File cannot be read */ + elog(ERROR, + "Cannot compare WAL file \"%s\" with compressed \"%s\"", + path1, path2); + + for (;;) + { + int read_len = fio_gzread(gz_in, buf, sizeof(buf)); + if (read_len <= 0 && !fio_gzeof(gz_in)) + { + /* An error occurred while reading the file */ + elog(WARNING, + "Cannot compare WAL file \"%s\" with compressed \"%s\": %d", + path1, path2, read_len); + return false; + } + COMP_FILE_CRC32(true, crc2, buf, read_len); + if (fio_gzeof(gz_in) || read_len == 0) + break; + } + FIN_FILE_CRC32(true, crc2); + + if (fio_gzclose(gz_in) != 0) + elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", + path2, get_gz_error(gz_in, errno)); + } + else +#endif + { + crc2 = fio_get_crc32(path2, FIO_BACKUP_HOST, true); + } + + /* Get checksum of original file */ + crc1 = fio_get_crc32(path1, FIO_DB_HOST, true); + + return EQ_CRC32C(crc1, crc2); +} + /* Copy file attributes */ -//static void -//copy_file_attributes(const char *from_path, fio_location from_location, -// const char *to_path, fio_location to_location, -// bool unlink_on_error) -//{ -// struct stat st; -// -// if (fio_stat(from_path, &st, true, from_location) == -1) -// { -// if (unlink_on_error) -// fio_unlink(to_path, to_location); -// elog(ERROR, "Cannot stat file \"%s\": %s", -// from_path, strerror(errno)); -// } -// -// if (fio_chmod(to_path, st.st_mode, to_location) == -1) -// { -// if (unlink_on_error) -// fio_unlink(to_path, to_location); -// elog(ERROR, "Cannot change mode of file \"%s\": %s", -// to_path, strerror(errno)); -// } -//} +static void +copy_file_attributes(const char *from_path, fio_location from_location, + const char *to_path, fio_location to_location, + bool unlink_on_error) +{ + struct stat st; + + if (fio_stat(from_path, &st, true, from_location) == -1) + { + if (unlink_on_error) + fio_unlink(to_path, to_location); + elog(ERROR, "Cannot stat file \"%s\": %s", + from_path, strerror(errno)); + } + + if (fio_chmod(to_path, st.st_mode, to_location) == -1) + { + if (unlink_on_error) + fio_unlink(to_path, to_location); + elog(ERROR, "Cannot change mode of file \"%s\": %s", + to_path, strerror(errno)); + } +} /* Look for files with '.ready' suffix in archive_status directory * and pack such files into batch sized array. diff --git a/src/pg_probackup.h b/src/pg_probackup.h index c9792ba1f..693686a63 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -78,6 +78,7 @@ extern const char *PROGRAM_EMAIL; #define HEADER_MAP_TMP "page_header_map_tmp" /* Timeout defaults */ +#define PARTIAL_WAL_TIMER 60 #define ARCHIVE_TIMEOUT_DEFAULT 300 #define REPLICA_TIMEOUT_DEFAULT 300 #define LOCK_TIMEOUT 60 From fcb4dae320109d281cc821a1694ac6e2a9ce57bb Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Fri, 3 Sep 2021 15:31:42 +0500 Subject: [PATCH 03/22] Fix tests for checking wal-file-path option --- .travis.yml | 7 ++++ tests/archive.py | 101 ++++++++++++++++++++++------------------------- 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/.travis.yml b/.travis.yml index b6b8fd217..dcae1feb4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -44,6 +44,13 @@ env: # - PG_VERSION=12 PG_BRANCH=REL_12_STABLE MODE=retention # - PG_VERSION=12 PG_BRANCH=REL_12_STABLE MODE=restore - PG_VERSION=15 PG_BRANCH=master + - PG_VERSION=14 PG_BRANCH=REL_14_STABLE MODE=archive + - PG_VERSION=13 PG_BRANCH=REL_13_STABLE MODE=archive + - PG_VERSION=12 PG_BRANCH=REL_12_STABLE MODE=archive + - PG_VERSION=11 PG_BRANCH=REL_11_STABLE MODE=archive + - PG_VERSION=10 PG_BRANCH=REL_10_STABLE MODE=archive + - PG_VERSION=9.6 PG_BRANCH=REL9_6_STABLE MODE=archive + - PG_VERSION=9.5 PG_BRANCH=REL9_5_STABLE MODE=archive jobs: allow_failures: diff --git a/tests/archive.py b/tests/archive.py index 4966ec710..9c934ec6d 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1810,34 +1810,35 @@ def test_wal_file_path(self): """ fname = self.id().split('.')[3] backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + self.set_archiving node = self.make_simple_node( base_dir=os.path.join(module_name, fname, 'node'), set_replication=True, - initdb_params=['--data-checksums']) + initdb_params=['--data-checksums'], + pg_options={ + 'archive_mode': 'on'}) self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) + self.set_archiving(backup_dir, 'node', node, compress=True) + archive_command = '\"{0}\" archive-push -B \"{1}\" --instance \"{2}\" --wal-file-name=%f'.format( + self.probackup_path, backup_dir, 'node') + self.set_auto_conf( + node, + {'archive_command': archive_command}) + + node.slow_start() # FULL self.backup_node(backup_dir, 'node', node) - node.pgbench_init(scale=1) - node.cleanup() + log_file = os.path.join(node.logs_dir, 'postgresql.log') + with open(log_file, 'r') as f: + log_content = f.read() - if self.get_version(node) >= self.version_to_num('12.0'): - recovery_conf = os.path.join(node.data_dir, 'postgresql.auto.conf') - else: - recovery_conf = os.path.join(node.data_dir, 'recovery.conf') + self.assertIn('Required parameter is not specified: --wal_file_path', log_content) - with open(recovery_conf, 'r') as f: - recovery_content = f.read() - - self.assertIn( - "restore_command = '\"{0}\" archive-push -B \"{1}\" --instance \"{2}\" " - "--wal-file-name=%f --remote-host=localhost " - "--remote-port=22 --remote-user={3}'".format( - self.probackup_path, backup_dir, 'node', self.user), - recovery_content) + self.assertIn('pg_probackup archive-push completed successfully', log_content) self.del_test_dir(module_name, fname) @@ -1852,32 +1853,25 @@ def test_wal_file_path_2(self): node = self.make_simple_node( base_dir=os.path.join(module_name, fname, 'node'), set_replication=True, - initdb_params=['--data-checksums']) + initdb_params=['--data-checksums'], + pg_options={ + 'archive_mode': 'on', + 'archive_command': 'exit 0'}) self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) + self.set_archiving(backup_dir, 'node', node, compress=True) - # FULL - self.backup_node(backup_dir, 'node', node) - node.pgbench_init(scale=1) - - node.cleanup() - - if self.get_version(node) >= self.version_to_num('12.0'): - recovery_conf = os.path.join(node.data_dir, 'postgresql.auto.conf') - else: - recovery_conf = os.path.join(node.data_dir, 'recovery.conf') + node.slow_start() - with open(recovery_conf, 'r') as f: - recovery_content = f.read() + # FULL + self.backup_node(backup_dir, 'node', node, options=['--stream']) - self.assertIn( - "restore_command = '\"{0}\" archive-push -B \"{1}\" --instance \"{2}\" " - "--wal-file-path=%p --wal-file-name=%f --remote-host=localhost " - "--remote-port=22 --remote-user={3}'".format( - self.probackup_path, backup_dir, 'node', self.user), - recovery_content) + log_file = os.path.join(node.logs_dir, 'postgresql.log') + with open(log_file, 'r') as f: + log_content = f.read() + self.assertIn('pg_probackup archive-push completed successfully', log_content) self.del_test_dir(module_name, fname) # @unittest.skip("skip") @@ -1891,33 +1885,32 @@ def test_wal_file_path_3(self): node = self.make_simple_node( base_dir=os.path.join(module_name, fname, 'node'), set_replication=True, - initdb_params=['--data-checksums']) + initdb_params=['--data-checksums'], + pg_options={ + 'archive_mode': 'on'}) self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) + self.set_archiving(backup_dir, 'node', node, compress=True) + walfilepath = backup_dir + '/..' + archive_command = '\"{0}\" archive-push -B \"{1}\"'.format(self.probackup_path, backup_dir) + archive_command += '--instance \"{0}\"'.format('node') + archive_command += ' --wal-file-path=\"{0}\" --wal-file-name=%f'.format(walfilepath) - # FULL - self.backup_node(backup_dir, 'node', node) - node.pgbench_init(scale=1) - - node.cleanup() + self.set_auto_conf( + node, + {'archive_command': archive_command}) - if self.get_version(node) >= self.version_to_num('12.0'): - recovery_conf = os.path.join(node.data_dir, 'postgresql.auto.conf') - else: - recovery_conf = os.path.join(node.data_dir, 'recovery.conf') + node.slow_start() - with open(recovery_conf, 'r') as f: - recovery_content = f.read() + # FULL + self.backup_node(backup_dir, 'node', node) - test_wal_filepath = self.probackup_path + "/test_walpath" + log_file = os.path.join(node.logs_dir, 'postgresql.log') + with open(log_file, 'r') as f: + log_content = f.read() - self.assertIn( - "restore_command = '\"{0}\" archive-push -B \"{1}\" --instance \"{2}\" " - "--wal-file-path={3} --wal-file-name=%f --remote-host=localhost " - "--remote-port=22 --remote-user={4}'".format( - self.probackup_path, backup_dir, 'node', test_wal_filepath, self.user), - recovery_content) + self.assertIn('wal_file_path is setted by user', log_content) self.del_test_dir(module_name, fname) From 8dd36c1d305bf83af2614e79a3964ffdb7842e55 Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Thu, 16 Sep 2021 16:03:25 +0500 Subject: [PATCH 04/22] Fix test_wal_file_path_3 for checking option wal-file-path setted by user --- tests/archive.py | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/tests/archive.py b/tests/archive.py index 9c934ec6d..a353c8037 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1836,8 +1836,6 @@ def test_wal_file_path(self): with open(log_file, 'r') as f: log_content = f.read() - self.assertIn('Required parameter is not specified: --wal_file_path', log_content) - self.assertIn('pg_probackup archive-push completed successfully', log_content) self.del_test_dir(module_name, fname) @@ -1855,18 +1853,30 @@ def test_wal_file_path_2(self): set_replication=True, initdb_params=['--data-checksums'], pg_options={ - 'archive_mode': 'on', - 'archive_command': 'exit 0'}) + 'archive_mode': 'on'}) self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) self.set_archiving(backup_dir, 'node', node, compress=True) + archive_command = '\"{0}\" archive-push -B \"{1}\" --instance \"{2}\" --wal-file-path=%p --wal-file-name=%f'.format( + self.probackup_path, backup_dir, 'node') + self.set_auto_conf( + node, + {'archive_command': archive_command}) node.slow_start() + node.safe_psql( + "postgres", + "create table t_heap as select i" + " as id from generate_series(0,100) i") # FULL self.backup_node(backup_dir, 'node', node, options=['--stream']) +# self.run_pb(["archive-push", "-B", backup_dir, +# "--instance=node", "-D", node.data_dir, +# "--wal-file-name=%%f"])#"--wal-file-path=%%p", + log_file = os.path.join(node.logs_dir, 'postgresql.log') with open(log_file, 'r') as f: log_content = f.read() @@ -1892,19 +1902,22 @@ def test_wal_file_path_3(self): self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) self.set_archiving(backup_dir, 'node', node, compress=True) - walfilepath = backup_dir + '/..' - archive_command = '\"{0}\" archive-push -B \"{1}\"'.format(self.probackup_path, backup_dir) - archive_command += '--instance \"{0}\"'.format('node') - archive_command += ' --wal-file-path=\"{0}\" --wal-file-name=%f'.format(walfilepath) - self.set_auto_conf( - node, - {'archive_command': archive_command}) + wal_dir = os.path.join(backup_dir, 'wal', 'node') + self.set_auto_conf(node, {'archive_command': "cp -r \"{0}\" \"{1}\"".format(backup_dir, wal_dir)}) node.slow_start() + node.safe_psql( + "postgres", + "create table t_heap as select i" + " as id from generate_series(0,100) i") # FULL - self.backup_node(backup_dir, 'node', node) + self.backup_node(backup_dir, 'node', node, options=['--stream']) + + self.run_pb(["archive-push", "-B", backup_dir, + "--instance=node", "-D", node.data_dir, + "--wal-file-path", wal_dir, "--wal-file-name=%f"]) log_file = os.path.join(node.logs_dir, 'postgresql.log') with open(log_file, 'r') as f: From 3b870697729f19b22c8ca43fb7d3217f6321cbee Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Mon, 27 Sep 2021 20:50:55 +0500 Subject: [PATCH 05/22] Fixes for --wal-file-path option --- src/archive.c | 322 ++++++++++++++++++++++++-------------------------- 1 file changed, 157 insertions(+), 165 deletions(-) diff --git a/src/archive.c b/src/archive.c index 46e565b3a..2fe9142dd 100644 --- a/src/archive.c +++ b/src/archive.c @@ -145,8 +145,51 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path, parray *batch_files = NULL; int n_threads; + char xlog_wal_path[MAXPGPATH]; + if (wal_file_name == NULL) - elog(INFO, "Required parameter is not specified: --wal-file-name %%f"); + elog(ERROR, "Required parameter is not specified: --wal-file-name %%f"); + + join_path_components(xlog_wal_path, PG_XLOG_DIR, wal_file_name); + + if (wal_file_path == NULL) + { + elog(INFO, "Required parameter is not specified: --wal_file_path %%p " + "Setting wal-file-path by default"); + wal_file_path = xlog_wal_path; + } + + if (strcmp(wal_file_path, xlog_wal_path)!=0) + { + char backup_wal_file_path[MAXPGPATH]; + char absolute_wal_file_path[MAXPGPATH]; + + join_path_components(absolute_wal_file_path, current_dir, wal_file_path); + join_path_components(backup_wal_file_path, instance->arclog_path, wal_file_name); + + elog(INFO, "wal_file_path is setted by user %s", wal_file_path); + if (instance->compress_alg == PGLZ_COMPRESS) + elog(ERROR, "pglz compression is not supported"); + +#ifdef HAVE_LIBZ + if (instance->compress_alg == ZLIB_COMPRESS) + is_compress = IsXLogFileName(wal_file_name); + #endif + + push_wal_file(absolute_wal_file_path, backup_wal_file_path, is_compress, + overwrite, instance->compress_level); + elog(INFO, "pg_probackup archive-push completed successfully"); + + return; + } + + /* Create 'archlog_path' directory. Do nothing if it already exists. */ + //fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST); + +#ifdef HAVE_LIBZ + if (instance->compress_alg == ZLIB_COMPRESS) + is_compress = true; +#endif if (!getcwd(current_dir, sizeof(current_dir))) elog(ERROR, "getcwd() error"); @@ -160,189 +203,139 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path, if (system_id != instance->system_identifier) elog(ERROR, "Refuse to push WAL segment %s into archive. Instance parameters mismatch." "Instance '%s' should have SYSTEM_ID = " UINT64_FORMAT " instead of " UINT64_FORMAT, - wal_file_name, instance->name, instance->system_identifier, system_id); + wal_file_name, instance->name, instance->system_identifier, system_id); if (instance->compress_alg == PGLZ_COMPRESS) elog(ERROR, "Cannot use pglz for WAL compression"); - /* */ - elog(INFO, "instance %s", instance->backup_instance_path); - elog(INFO, "wal_file_path %s", wal_file_path); - if ((instance->backup_instance_path == wal_file_path) || (wal_file_path == NULL)) + join_path_components(pg_xlog_dir, current_dir, XLOGDIR); + join_path_components(archive_status_dir, pg_xlog_dir, "archive_status"); + + /* Setup filelist and locks */ + batch_files = setup_push_filelist(archive_status_dir, wal_file_name, batch_size); + + n_threads = num_threads; + if (num_threads > parray_num(batch_files)) + n_threads = parray_num(batch_files); + + elog(INFO, "pg_probackup archive-push WAL file: %s, " + "threads: %i/%i, batch: %lu/%i, compression: %s", + wal_file_name, n_threads, num_threads, + parray_num(batch_files), batch_size, + is_compress ? "zlib" : "none"); + + num_threads = n_threads; + + /* Single-thread push + * We don`t want to start multi-thread push, if number of threads in equal to 1, + * or the number of files ready to push is small. + * Multithreading in remote mode isn`t cheap, + * establishing ssh connection can take 100-200ms, so running and terminating + * one thread using generic multithread approach can take + * almost as much time as copying itself. + * TODO: maybe we should be more conservative and force single thread + * push if batch_files array is small. + */ + if (num_threads == 1 || (parray_num(batch_files) == 1)) { - if(wal_file_path == NULL) - { - elog(INFO, "Required parameter is not specified: --wal-file-path. Set by default"); - wal_file_path = instance->backup_instance_path; - } - - join_path_components(pg_xlog_dir, current_dir, XLOGDIR); - join_path_components(archive_status_dir, pg_xlog_dir, "archive_status"); - - /* Create 'archlog_path' directory. Do nothing if it already exists. */ - //fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST); - -#ifdef HAVE_LIBZ - if (instance->compress_alg == ZLIB_COMPRESS) - is_compress = true; -#endif - - /* Setup filelist and locks */ - batch_files = setup_push_filelist(archive_status_dir, wal_file_name, batch_size); - - n_threads = num_threads; - if (num_threads > parray_num(batch_files)) - n_threads = parray_num(batch_files); - - elog(INFO, "pg_probackup archive-push WAL file: %s, " - "threads: %i/%i, batch: %lu/%i, compression: %s", - wal_file_name, n_threads, num_threads, - parray_num(batch_files), batch_size, - is_compress ? "zlib" : "none"); - - num_threads = n_threads; - - /* Single-thread push - * We don`t want to start multi-thread push, if number of threads in equal to 1, - * or the number of files ready to push is small. - * Multithreading in remote mode isn`t cheap, - * establishing ssh connection can take 100-200ms, so running and terminating - * one thread using generic multithread approach can take - * almost as much time as copying itself. - * TODO: maybe we should be more conservative and force single thread - * push if batch_files array is small. - */ - if (num_threads == 1 || (parray_num(batch_files) == 1)) + INSTR_TIME_SET_CURRENT(start_time); + for (i = 0; i < parray_num(batch_files); i++) { - INSTR_TIME_SET_CURRENT(start_time); - for (i = 0; i < parray_num(batch_files); i++) - { - int rc; - WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i); - - rc = push_file(xlogfile, archive_status_dir, - pg_xlog_dir, instance->arclog_path, - overwrite, no_sync, - instance->archive_timeout, - no_ready_rename || (strcmp(xlogfile->name, wal_file_name) == 0) ? true : false, - is_compress && IsXLogFileName(xlogfile->name) ? true : false, - instance->compress_level); - if (rc == 0) - n_total_pushed++; - else - n_total_skipped++; - } + int rc; + WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i); - push_isok = true; - goto push_done; + rc = push_file(xlogfile, archive_status_dir, + pg_xlog_dir, instance->arclog_path, + overwrite, no_sync, + instance->archive_timeout, + no_ready_rename || (strcmp(xlogfile->name, wal_file_name) == 0) ? true : false, + is_compress && IsXLogFileName(xlogfile->name) ? true : false, + instance->compress_level); + if (rc == 0) + n_total_pushed++; + else + n_total_skipped++; } - /* init thread args with its own segno */ - threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); - threads_args = (archive_push_arg *) palloc(sizeof(archive_push_arg) * num_threads); - - for (i = 0; i < num_threads; i++) - { - archive_push_arg *arg = &(threads_args[i]); - - arg->first_filename = wal_file_name; - arg->archive_dir = instance->arclog_path; - arg->pg_xlog_dir = pg_xlog_dir; - arg->archive_status_dir = archive_status_dir; - arg->overwrite = overwrite; - arg->compress = is_compress; - arg->no_sync = no_sync; - arg->no_ready_rename = no_ready_rename; - arg->archive_timeout = instance->archive_timeout; + push_isok = true; + goto push_done; + } - arg->compress_alg = instance->compress_alg; - arg->compress_level = instance->compress_level; + /* init thread args with its own segno */ + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + threads_args = (archive_push_arg *) palloc(sizeof(archive_push_arg) * num_threads); - arg->files = batch_files; - arg->n_pushed = 0; - arg->n_skipped = 0; + for (i = 0; i < num_threads; i++) + { + archive_push_arg *arg = &(threads_args[i]); + + arg->first_filename = wal_file_name; + arg->archive_dir = instance->arclog_path; + arg->pg_xlog_dir = pg_xlog_dir; + arg->archive_status_dir = archive_status_dir; + arg->overwrite = overwrite; + arg->compress = is_compress; + arg->no_sync = no_sync; + arg->no_ready_rename = no_ready_rename; + arg->archive_timeout = instance->archive_timeout; + + arg->compress_alg = instance->compress_alg; + arg->compress_level = instance->compress_level; + + arg->files = batch_files; + arg->n_pushed = 0; + arg->n_skipped = 0; + + arg->thread_num = i+1; + /* By default there are some error */ + arg->ret = 1; + } - arg->thread_num = i+1; - /* By default there are some error */ - arg->ret = 1; - } + /* Run threads */ + INSTR_TIME_SET_CURRENT(start_time); + for (i = 0; i < num_threads; i++) + { + archive_push_arg *arg = &(threads_args[i]); + pthread_create(&threads[i], NULL, push_files, arg); + } - /* Run threads */ - INSTR_TIME_SET_CURRENT(start_time); - for (i = 0; i < num_threads; i++) + /* Wait threads */ + for (i = 0; i < num_threads; i++) + { + pthread_join(threads[i], NULL); + if (threads_args[i].ret == 1) { - archive_push_arg *arg = &(threads_args[i]); - pthread_create(&threads[i], NULL, push_files, arg); + push_isok = false; + n_total_failed++; } - /* Wait threads */ - for (i = 0; i < num_threads; i++) - { - pthread_join(threads[i], NULL); - if (threads_args[i].ret == 1) - { - push_isok = false; - n_total_failed++; - } - - n_total_pushed += threads_args[i].n_pushed; - n_total_skipped += threads_args[i].n_skipped; - } + n_total_pushed += threads_args[i].n_pushed; + n_total_skipped += threads_args[i].n_skipped; + } - /* Note, that we are leaking memory here, - * because pushing into archive is a very - * time-sensetive operation, so we skip freeing stuff. - */ + /* Note, that we are leaking memory here, + * because pushing into archive is a very + * time-sensetive operation, so we skip freeing stuff. + */ push_done: - fio_disconnect(); - /* calculate elapsed time */ - INSTR_TIME_SET_CURRENT(end_time); - INSTR_TIME_SUBTRACT(end_time, start_time); - push_time = INSTR_TIME_GET_DOUBLE(end_time); - pretty_time_interval(push_time, pretty_time_str, 20); - - if (push_isok) - /* report number of files pushed into archive */ - elog(INFO, "pg_probackup archive-push completed successfully, " - "pushed: %u, skipped: %u, time elapsed: %s", - n_total_pushed, n_total_skipped, pretty_time_str); - else - elog(ERROR, "pg_probackup archive-push failed, " - "pushed: %i, skipped: %u, failed: %u, time elapsed: %s", - n_total_pushed, n_total_skipped, n_total_failed, - pretty_time_str); - } + fio_disconnect(); + /* calculate elapsed time */ + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + push_time = INSTR_TIME_GET_DOUBLE(end_time); + pretty_time_interval(push_time, pretty_time_str, 20); + + if (push_isok) + /* report number of files pushed into archive */ + elog(INFO, "pg_probackup archive-push completed successfully, " + "pushed: %u, skipped: %u, time elapsed: %s", + n_total_pushed, n_total_skipped, pretty_time_str); else - { - /* If user passed a directory different from saved in pg_probackup.conf, - * work according to the scheme with a single copy of files - */ - char backup_wal_file_path[MAXPGPATH]; - char absolute_wal_file_path[MAXPGPATH]; - - /* Check that -j option is not set with --wal-file-path option and ignore it*/ - if (num_threads > 1) - elog(ERROR, "Option -j is not working with user defined --wal-file-path. Ignore"); - - join_path_components(absolute_wal_file_path, current_dir, wal_file_path); - join_path_components(backup_wal_file_path, instance->arclog_path, wal_file_name); - - elog(INFO, "pg_probackup archive-push from %s to %s", absolute_wal_file_path, backup_wal_file_path); - - if (instance->compress_alg == PGLZ_COMPRESS) - elog(ERROR, "pglz compression is not supported"); - -#ifdef HAVE_LIBZ - if (instance->compress_alg == ZLIB_COMPRESS) - is_compress = IsXLogFileName(wal_file_name); -#endif - - push_wal_file(absolute_wal_file_path, backup_wal_file_path, is_compress, - overwrite, instance->compress_level); - elog(INFO, "pg_probackup archive-push completed successfully"); - } - + elog(ERROR, "pg_probackup archive-push failed, " + "pushed: %i, skipped: %u, failed: %u, time elapsed: %s", + n_total_pushed, n_total_skipped, n_total_failed, + pretty_time_str); } /* ------------- INTERNAL FUNCTIONS ---------- */ @@ -1178,7 +1171,6 @@ get_gz_error(gzFile gzf, int errnum) } #endif - /* * compare CRC of two WAL files. * If necessary, decompress WAL file from path2 From 4e6682118d358fefc3aab9339a420b1bbdd0dd96 Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Mon, 4 Oct 2021 16:20:51 +0300 Subject: [PATCH 06/22] Added pg_probackup.log reading for checking --wal-file-path option --- tests/archive.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/tests/archive.py b/tests/archive.py index a353c8037..fdb8bc2aa 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1873,10 +1873,6 @@ def test_wal_file_path_2(self): # FULL self.backup_node(backup_dir, 'node', node, options=['--stream']) -# self.run_pb(["archive-push", "-B", backup_dir, -# "--instance=node", "-D", node.data_dir, -# "--wal-file-name=%%f"])#"--wal-file-path=%%p", - log_file = os.path.join(node.logs_dir, 'postgresql.log') with open(log_file, 'r') as f: log_content = f.read() @@ -1903,27 +1899,41 @@ def test_wal_file_path_3(self): self.add_instance(backup_dir, 'node', node) self.set_archiving(backup_dir, 'node', node, compress=True) - wal_dir = os.path.join(backup_dir, 'wal', 'node') - self.set_auto_conf(node, {'archive_command': "cp -r \"{0}\" \"{1}\"".format(backup_dir, wal_dir)}) + wal_dir = os.path.join(self.tmp_path, module_name, fname, 'wal_dir') + os.mkdir(wal_dir) + self.set_config( + backup_dir, 'node', + options=['--log-level-file=VERBOSE']) + self.set_auto_conf(node, {'archive_command': "cp -v %p {0}/%f".format(wal_dir)}) node.slow_start() node.safe_psql( "postgres", "create table t_heap as select i" - " as id from generate_series(0,100) i") + " as id from generate_series(0,1000) i") # FULL self.backup_node(backup_dir, 'node', node, options=['--stream']) + node.safe_psql( + "postgres", + "insert into t_heap select i" + " as id from generate_series(0,1000) i") + self.backup_node(backup_dir, 'node', node, + options=['--stream']) + + filename = '000000010000000000000001' + self.run_pb(["archive-push", "-B", backup_dir, "--instance=node", "-D", node.data_dir, - "--wal-file-path", wal_dir, "--wal-file-name=%f"]) + "--wal-file-path", wal_dir, "--wal-file-name", filename]) - log_file = os.path.join(node.logs_dir, 'postgresql.log') + log_file = os.path.join( + backup_dir, 'log', 'pg_probackup.log') with open(log_file, 'r') as f: log_content = f.read() - self.assertIn('wal_file_path is setted by user', log_content) + self.assertIn('pg_probackup archive-push completed successfully', log_content) self.del_test_dir(module_name, fname) From e1edfa437d96aca76f2d4382419eaa4fc7d2845f Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Mon, 4 Oct 2021 16:29:53 +0300 Subject: [PATCH 07/22] Added warnings for -j and --batch-size setted while --wal-file-path defined by user --- src/archive.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/archive.c b/src/archive.c index 2fe9142dd..e6f6712dc 100644 --- a/src/archive.c +++ b/src/archive.c @@ -164,10 +164,15 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path, char backup_wal_file_path[MAXPGPATH]; char absolute_wal_file_path[MAXPGPATH]; - join_path_components(absolute_wal_file_path, current_dir, wal_file_path); + join_path_components(absolute_wal_file_path, wal_file_path, wal_file_name); join_path_components(backup_wal_file_path, instance->arclog_path, wal_file_name); elog(INFO, "wal_file_path is setted by user %s", wal_file_path); + + if ((batch_size > 1)||(num_threads > 1)) + elog(WARNING, "Options -j and --batch-size are ignored " + "with --wal-file-path setted by user"); + if (instance->compress_alg == PGLZ_COMPRESS) elog(ERROR, "pglz compression is not supported"); @@ -959,6 +964,9 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, #endif to_path_p = to_path; +// elog(INFO, "from_path %s", from_path); +// Assert(0); + /* open file for read */ in = fio_fopen(from_path, PG_BINARY_R, FIO_DB_HOST); if (in == NULL) @@ -1066,6 +1074,8 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, read_len = fio_fread(in, buf, sizeof(buf)); +// Assert(0); + if (read_len < 0) { errno_temp = errno; From b3e9072eb72be48aab9d8ad8c21f1571d4a2127c Mon Sep 17 00:00:00 2001 From: "Mikhail A. Kulagin" Date: Mon, 11 Oct 2021 14:03:07 +0300 Subject: [PATCH 08/22] [Issue #439] skip unsupported tests in 9.5 (tests with backups from replica and with pg_control_checkpoint() calling) --- .travis.yml | 15 ++++++++------- tests/archive.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/.travis.yml b/.travis.yml index b6b8fd217..738537cd9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,13 +26,14 @@ notifications: # Default MODE is basic, i.e. all tests with PG_PROBACKUP_TEST_BASIC=ON env: - - PG_VERSION=14 PG_BRANCH=REL_14_STABLE - - PG_VERSION=13 PG_BRANCH=REL_13_STABLE - - PG_VERSION=12 PG_BRANCH=REL_12_STABLE - - PG_VERSION=11 PG_BRANCH=REL_11_STABLE - - PG_VERSION=10 PG_BRANCH=REL_10_STABLE - - PG_VERSION=9.6 PG_BRANCH=REL9_6_STABLE - - PG_VERSION=9.5 PG_BRANCH=REL9_5_STABLE + - PG_VERSION=9.5 PG_BRANCH=REL9_5_STABLE MODE=archive +# - PG_VERSION=14 PG_BRANCH=REL_14_STABLE +# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE +# - PG_VERSION=12 PG_BRANCH=REL_12_STABLE +# - PG_VERSION=11 PG_BRANCH=REL_11_STABLE +# - PG_VERSION=10 PG_BRANCH=REL_10_STABLE +# - PG_VERSION=9.6 PG_BRANCH=REL9_6_STABLE +# - PG_VERSION=9.5 PG_BRANCH=REL9_5_STABLE # - PG_VERSION=12 PG_BRANCH=REL_12_STABLE MODE=archive # - PG_VERSION=12 PG_BRANCH=REL_12_STABLE MODE=backup # - PG_VERSION=12 PG_BRANCH=REL_12_STABLE MODE=compression diff --git a/tests/archive.py b/tests/archive.py index 0ade2d66a..4b07c1dbd 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -83,6 +83,12 @@ def test_pgpro434_2(self): pg_options={ 'checkpoint_timeout': '30s'} ) + + if self.get_version(node) < self.version_to_num('9.6.0'): + self.del_test_dir(module_name, fname) + return unittest.skip( + 'Skipped because pg_control_checkpoint() is not supported in PG 9.5') + self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) self.set_archiving(backup_dir, 'node', node) @@ -693,6 +699,11 @@ def test_replica_archive(self): 'checkpoint_timeout': '30s', 'max_wal_size': '32MB'}) + if self.get_version(master) < self.version_to_num('9.6.0'): + self.del_test_dir(module_name, fname) + return unittest.skip( + 'Skipped because backup from replica is not supported in PG 9.5') + self.init_pb(backup_dir) # ADD INSTANCE 'MASTER' self.add_instance(backup_dir, 'master', master) @@ -818,6 +829,12 @@ def test_master_and_replica_parallel_archiving(self): pg_options={ 'archive_timeout': '10s'} ) + + if self.get_version(master) < self.version_to_num('9.6.0'): + self.del_test_dir(module_name, fname) + return unittest.skip( + 'Skipped because backup from replica is not supported in PG 9.5') + replica = self.make_simple_node( base_dir=os.path.join(module_name, fname, 'replica')) replica.cleanup() @@ -908,6 +925,11 @@ def test_basic_master_and_replica_concurrent_archiving(self): 'checkpoint_timeout': '30s', 'archive_timeout': '10s'}) + if self.get_version(master) < self.version_to_num('9.6.0'): + self.del_test_dir(module_name, fname) + return unittest.skip( + 'Skipped because backup from replica is not supported in PG 9.5') + replica = self.make_simple_node( base_dir=os.path.join(module_name, fname, 'replica')) replica.cleanup() @@ -2009,6 +2031,11 @@ def test_archive_pg_receivexlog_partial_handling(self): set_replication=True, initdb_params=['--data-checksums']) + if self.get_version(node) < self.version_to_num('9.6.0'): + self.del_test_dir(module_name, fname) + return unittest.skip( + 'Skipped because backup from replica is not supported in PG 9.5') + self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) @@ -2655,4 +2682,4 @@ def test_archive_empty_history_file(self): #t2 ---------------- # / #t1 -A-------- -# \ No newline at end of file +# From 4be96c60c85ae848a029267d6b7b2c9164d821e5 Mon Sep 17 00:00:00 2001 From: "Mikhail A. Kulagin" Date: Mon, 11 Oct 2021 15:25:44 +0300 Subject: [PATCH 09/22] [Issue #439] revert .travis.yml --- .travis.yml | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/.travis.yml b/.travis.yml index 738537cd9..b6b8fd217 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,14 +26,13 @@ notifications: # Default MODE is basic, i.e. all tests with PG_PROBACKUP_TEST_BASIC=ON env: - - PG_VERSION=9.5 PG_BRANCH=REL9_5_STABLE MODE=archive -# - PG_VERSION=14 PG_BRANCH=REL_14_STABLE -# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE -# - PG_VERSION=12 PG_BRANCH=REL_12_STABLE -# - PG_VERSION=11 PG_BRANCH=REL_11_STABLE -# - PG_VERSION=10 PG_BRANCH=REL_10_STABLE -# - PG_VERSION=9.6 PG_BRANCH=REL9_6_STABLE -# - PG_VERSION=9.5 PG_BRANCH=REL9_5_STABLE + - PG_VERSION=14 PG_BRANCH=REL_14_STABLE + - PG_VERSION=13 PG_BRANCH=REL_13_STABLE + - PG_VERSION=12 PG_BRANCH=REL_12_STABLE + - PG_VERSION=11 PG_BRANCH=REL_11_STABLE + - PG_VERSION=10 PG_BRANCH=REL_10_STABLE + - PG_VERSION=9.6 PG_BRANCH=REL9_6_STABLE + - PG_VERSION=9.5 PG_BRANCH=REL9_5_STABLE # - PG_VERSION=12 PG_BRANCH=REL_12_STABLE MODE=archive # - PG_VERSION=12 PG_BRANCH=REL_12_STABLE MODE=backup # - PG_VERSION=12 PG_BRANCH=REL_12_STABLE MODE=compression From 3500e9fe01449d9527837a81d0f40464e16428c6 Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Fri, 15 Oct 2021 16:24:33 +0500 Subject: [PATCH 10/22] Used an existing code fragment copying one file into one stream instead of new code --- src/archive.c | 410 ++++++-------------------------------------------- 1 file changed, 49 insertions(+), 361 deletions(-) diff --git a/src/archive.c b/src/archive.c index e6f6712dc..524fd1805 100644 --- a/src/archive.c +++ b/src/archive.c @@ -21,8 +21,6 @@ static int push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_ const char *archive_dir, bool overwrite, bool no_sync, int compress_level, uint32 archive_timeout); #endif -static void push_wal_file(const char *from_path, const char *to_path, - bool is_compress, bool overwrite, int compress_level); static void *push_files(void *arg); static void *get_files(void *arg); static bool get_wal_file(const char *filename, const char *from_path, const char *to_path, @@ -32,12 +30,10 @@ static int get_wal_file_internal(const char *from_path, const char *to_path, FIL #ifdef HAVE_LIBZ static const char *get_gz_error(gzFile gzf, int errnum); #endif -static bool fileEqualCRC(const char *path1, const char *path2, - bool path2_is_compressed); -static void copy_file_attributes(const char *from_path, - fio_location from_location, - const char *to_path, fio_location to_location, - bool unlink_on_error); +// static void copy_file_attributes(const char *from_path, +// fio_location from_location, +// const char *to_path, fio_location to_location, +// bool unlink_on_error); static bool next_wal_segment_exists(TimeLineID tli, XLogSegNo segno, const char *prefetch_dir, uint32 wal_seg_size); static uint32 run_wal_prefetch(const char *prefetch_dir, const char *archive_dir, TimeLineID tli, @@ -161,61 +157,42 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path, if (strcmp(wal_file_path, xlog_wal_path)!=0) { - char backup_wal_file_path[MAXPGPATH]; - char absolute_wal_file_path[MAXPGPATH]; + elog(INFO, "wal_file_path is setted by user %s", wal_file_path); - join_path_components(absolute_wal_file_path, wal_file_path, wal_file_name); - join_path_components(backup_wal_file_path, instance->arclog_path, wal_file_name); + join_path_components(pg_xlog_dir, instance->pgdata, XLOGDIR); + } + else + { + /* Create 'archlog_path' directory. Do nothing if it already exists. */ + //fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST); - elog(INFO, "wal_file_path is setted by user %s", wal_file_path); + if (!getcwd(current_dir, sizeof(current_dir))) + elog(ERROR, "getcwd() error"); - if ((batch_size > 1)||(num_threads > 1)) - elog(WARNING, "Options -j and --batch-size are ignored " - "with --wal-file-path setted by user"); + /* verify that archive-push --instance parameter is valid */ + system_id = get_system_identifier(current_dir); - if (instance->compress_alg == PGLZ_COMPRESS) - elog(ERROR, "pglz compression is not supported"); + if (instance->pgdata == NULL) + elog(ERROR, "Cannot read pg_probackup.conf for this instance"); -#ifdef HAVE_LIBZ - if (instance->compress_alg == ZLIB_COMPRESS) - is_compress = IsXLogFileName(wal_file_name); - #endif + if (system_id != instance->system_identifier) + elog(ERROR, "Refuse to push WAL segment %s into archive. Instance parameters mismatch." + "Instance '%s' should have SYSTEM_ID = " UINT64_FORMAT " instead of " UINT64_FORMAT, + wal_file_name, instance->name, instance->system_identifier, system_id); - push_wal_file(absolute_wal_file_path, backup_wal_file_path, is_compress, - overwrite, instance->compress_level); - elog(INFO, "pg_probackup archive-push completed successfully"); + if (instance->compress_alg == PGLZ_COMPRESS) + elog(ERROR, "Cannot use pglz for WAL compression"); - return; + join_path_components(pg_xlog_dir, current_dir, XLOGDIR); } - /* Create 'archlog_path' directory. Do nothing if it already exists. */ - //fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST); + join_path_components(archive_status_dir, pg_xlog_dir, "archive_status"); #ifdef HAVE_LIBZ if (instance->compress_alg == ZLIB_COMPRESS) is_compress = true; #endif - if (!getcwd(current_dir, sizeof(current_dir))) - elog(ERROR, "getcwd() error"); - - /* verify that archive-push --instance parameter is valid */ - system_id = get_system_identifier(current_dir); - - if (instance->pgdata == NULL) - elog(ERROR, "Cannot read pg_probackup.conf for this instance"); - - if (system_id != instance->system_identifier) - elog(ERROR, "Refuse to push WAL segment %s into archive. Instance parameters mismatch." - "Instance '%s' should have SYSTEM_ID = " UINT64_FORMAT " instead of " UINT64_FORMAT, - wal_file_name, instance->name, instance->system_identifier, system_id); - - if (instance->compress_alg == PGLZ_COMPRESS) - elog(ERROR, "Cannot use pglz for WAL compression"); - - join_path_components(pg_xlog_dir, current_dir, XLOGDIR); - join_path_components(archive_status_dir, pg_xlog_dir, "archive_status"); - /* Setup filelist and locks */ batch_files = setup_push_filelist(archive_status_dir, wal_file_name, batch_size); @@ -932,237 +909,6 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, } #endif -/* - * Copy WAL segment from pgdata to archive catalog with possible compression. - */ -void -push_wal_file(const char *from_path, const char *to_path, bool is_compress, - bool overwrite, int compress_level) -{ - FILE *in = NULL; - int out = -1; - char buf[XLOG_BLCKSZ]; - const char *to_path_p; - char to_path_temp[MAXPGPATH]; - int errno_temp; - /* partial handling */ - struct stat st; - int partial_try_count = 0; - int partial_file_size = 0; - bool partial_file_exists = false; - -#ifdef HAVE_LIBZ - char gz_to_path[MAXPGPATH]; - gzFile gz_out = NULL; - - if (is_compress) - { - snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); - to_path_p = gz_to_path; - } - else -#endif - to_path_p = to_path; - -// elog(INFO, "from_path %s", from_path); -// Assert(0); - - /* open file for read */ - in = fio_fopen(from_path, PG_BINARY_R, FIO_DB_HOST); - if (in == NULL) - elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path, - strerror(errno)); - - /* Check if possible to skip copying */ - if (fileExists(to_path_p, FIO_BACKUP_HOST)) - { - if (fileEqualCRC(from_path, to_path_p, is_compress)) - return; - /* Do not copy and do not rise error. Just quit as normal. */ - else if (!overwrite) - elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p); - } - - /* open backup file for write */ -#ifdef HAVE_LIBZ - if (is_compress) - { - snprintf(to_path_temp, sizeof(to_path_temp), "%s.part", gz_to_path); - - gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, compress_level, FIO_BACKUP_HOST); - if (gz_out == NULL) - { - partial_file_exists = true; - elog(WARNING, "Cannot open destination temporary WAL file \"%s\": %s", - to_path_temp, strerror(errno)); - } - } - else -#endif - { - snprintf(to_path_temp, sizeof(to_path_temp), "%s.part", to_path); - - out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST); - if (out < 0) - { - partial_file_exists = true; - elog(WARNING, "Cannot open destination temporary WAL file \"%s\": %s", - to_path_temp, strerror(errno)); - } - } - - /* Partial file is already exists, it could have happened due to failed archive-push, - * in this case partial file can be discarded, or due to concurrent archiving. - * - * Our main goal here is to try to handle partial file to prevent stalling of - * continious archiving. - * To ensure that ecncountered partial file is actually a stale "orphaned" file, - * check its size every second. - * If the size has not changed in PARTIAL_WAL_TIMER seconds, we can consider - * the file stale and reuse it. - * If file size is changing, it means that another archiver works at the same - * directory with the same files. Such partial files cannot be reused. - */ - if (partial_file_exists) - { - while (partial_try_count < PARTIAL_WAL_TIMER) - { - - if (fio_stat(to_path_temp, &st, false, FIO_BACKUP_HOST) < 0) - /* It is ok if partial is gone, we can safely error out */ - elog(ERROR, "Cannot stat destination temporary WAL file \"%s\": %s", to_path_temp, - strerror(errno)); - - /* first round */ - if (!partial_try_count) - partial_file_size = st.st_size; - - /* file size is changing */ - if (st.st_size > partial_file_size) - elog(ERROR, "Destination temporary WAL file \"%s\" is not stale", to_path_temp); - - sleep(1); - partial_try_count++; - } - - /* Partial segment is considered stale, so reuse it */ - elog(WARNING, "Reusing stale destination temporary WAL file \"%s\"", to_path_temp); - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - -#ifdef HAVE_LIBZ - if (is_compress) - { - gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, compress_level, FIO_BACKUP_HOST); - if (gz_out == NULL) - elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s", - to_path_temp, strerror(errno)); - } - else -#endif - { - out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST); - if (out < 0) - elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s", - to_path_temp, strerror(errno)); - } - } - - /* copy content */ - for (;;) - { - ssize_t read_len = 0; - - read_len = fio_fread(in, buf, sizeof(buf)); - -// Assert(0); - - if (read_len < 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, - "Cannot read source WAL file \"%s\": %s", - from_path, strerror(errno_temp)); - } - - if (read_len > 0) - { -#ifdef HAVE_LIBZ - if (is_compress) - { - if (fio_gzwrite(gz_out, buf, read_len) != read_len) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot write to compressed WAL file \"%s\": %s", - to_path_temp, get_gz_error(gz_out, errno_temp)); - } - } - else -#endif - { - if (fio_write(out, buf, read_len) != read_len) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot write to WAL file \"%s\": %s", - to_path_temp, strerror(errno_temp)); - } - } - } - - if (read_len == 0) - break; - } - -#ifdef HAVE_LIBZ - if (is_compress) - { - if (fio_gzclose(gz_out) != 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", - to_path_temp, get_gz_error(gz_out, errno_temp)); - } - } - else -#endif - { - if (fio_flush(out) != 0 || fio_close(out) != 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot write WAL file \"%s\": %s", - to_path_temp, strerror(errno_temp)); - } - } - - if (fio_fclose(in)) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot close source WAL file \"%s\": %s", - from_path, strerror(errno_temp)); - } - - /* update file permission. */ - copy_file_attributes(from_path, FIO_DB_HOST, to_path_temp, FIO_BACKUP_HOST, true); - - if (fio_rename(to_path_temp, to_path_p, FIO_BACKUP_HOST) < 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s", - to_path_temp, to_path_p, strerror(errno_temp)); - } - -#ifdef HAVE_LIBZ - if (is_compress) - elog(INFO, "WAL file compressed to \"%s\"", gz_to_path); -#endif -} - #ifdef HAVE_LIBZ /* * Show error during work with compressed file @@ -1181,88 +927,30 @@ get_gz_error(gzFile gzf, int errnum) } #endif -/* - * compare CRC of two WAL files. - * If necessary, decompress WAL file from path2 - */ -static bool -fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) -{ - pg_crc32 crc1; - pg_crc32 crc2; - - /* Get checksum of backup file */ -#ifdef HAVE_LIBZ - if (path2_is_compressed) - { - char buf [1024]; - gzFile gz_in = NULL; - - INIT_FILE_CRC32(true, crc2); - gz_in = fio_gzopen(path2, PG_BINARY_R, Z_DEFAULT_COMPRESSION, FIO_BACKUP_HOST); - if (gz_in == NULL) - /* File cannot be read */ - elog(ERROR, - "Cannot compare WAL file \"%s\" with compressed \"%s\"", - path1, path2); - - for (;;) - { - int read_len = fio_gzread(gz_in, buf, sizeof(buf)); - if (read_len <= 0 && !fio_gzeof(gz_in)) - { - /* An error occurred while reading the file */ - elog(WARNING, - "Cannot compare WAL file \"%s\" with compressed \"%s\": %d", - path1, path2, read_len); - return false; - } - COMP_FILE_CRC32(true, crc2, buf, read_len); - if (fio_gzeof(gz_in) || read_len == 0) - break; - } - FIN_FILE_CRC32(true, crc2); - - if (fio_gzclose(gz_in) != 0) - elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", - path2, get_gz_error(gz_in, errno)); - } - else -#endif - { - crc2 = fio_get_crc32(path2, FIO_BACKUP_HOST, true); - } - - /* Get checksum of original file */ - crc1 = fio_get_crc32(path1, FIO_DB_HOST, true); - - return EQ_CRC32C(crc1, crc2); -} - -/* Copy file attributes */ -static void -copy_file_attributes(const char *from_path, fio_location from_location, - const char *to_path, fio_location to_location, - bool unlink_on_error) -{ - struct stat st; - - if (fio_stat(from_path, &st, true, from_location) == -1) - { - if (unlink_on_error) - fio_unlink(to_path, to_location); - elog(ERROR, "Cannot stat file \"%s\": %s", - from_path, strerror(errno)); - } - - if (fio_chmod(to_path, st.st_mode, to_location) == -1) - { - if (unlink_on_error) - fio_unlink(to_path, to_location); - elog(ERROR, "Cannot change mode of file \"%s\": %s", - to_path, strerror(errno)); - } -} +// /* Copy file attributes */ +// static void +// copy_file_attributes(const char *from_path, fio_location from_location, +// const char *to_path, fio_location to_location, +// bool unlink_on_error) +// { +// struct stat st; + +// if (fio_stat(from_path, &st, true, from_location) == -1) +// { +// if (unlink_on_error) +// fio_unlink(to_path, to_location); +// elog(ERROR, "Cannot stat file \"%s\": %s", +// from_path, strerror(errno)); +// } + +// if (fio_chmod(to_path, st.st_mode, to_location) == -1) +// { +// if (unlink_on_error) +// fio_unlink(to_path, to_location); +// elog(ERROR, "Cannot change mode of file \"%s\": %s", +// to_path, strerror(errno)); +// } +// } /* Look for files with '.ready' suffix in archive_status directory * and pack such files into batch sized array. From 17bd528c60d7b3d736c3e3d3344fe0092745dd32 Mon Sep 17 00:00:00 2001 From: "Mikhail A. Kulagin" Date: Mon, 15 Nov 2021 11:37:05 +0300 Subject: [PATCH 11/22] [PGPRO-5421] cleanup patch code --- src/archive.c | 67 +++++++++++++++++++++++----------------------- src/pg_probackup.h | 1 - 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/src/archive.c b/src/archive.c index 326daf6cd..2082cced1 100644 --- a/src/archive.c +++ b/src/archive.c @@ -30,10 +30,10 @@ static int get_wal_file_internal(const char *from_path, const char *to_path, FIL #ifdef HAVE_LIBZ static const char *get_gz_error(gzFile gzf, int errnum); #endif -// static void copy_file_attributes(const char *from_path, -// fio_location from_location, -// const char *to_path, fio_location to_location, -// bool unlink_on_error); +//static void copy_file_attributes(const char *from_path, +// fio_location from_location, +// const char *to_path, fio_location to_location, +// bool unlink_on_error); static bool next_wal_segment_exists(TimeLineID tli, XLogSegNo segno, const char *prefetch_dir, uint32 wal_seg_size); static uint32 run_wal_prefetch(const char *prefetch_dir, const char *archive_dir, TimeLineID tli, @@ -120,6 +120,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa uint64 i; char current_dir[MAXPGPATH]; char pg_xlog_dir[MAXPGPATH]; + char xlog_wal_path[MAXPGPATH]; char archive_status_dir[MAXPGPATH]; uint64 system_id; bool is_compress = false; @@ -141,8 +142,6 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa parray *batch_files = NULL; int n_threads; - char xlog_wal_path[MAXPGPATH]; - if (wal_file_name == NULL) elog(ERROR, "Required parameter is not specified: --wal-file-name %%f"); @@ -150,14 +149,14 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa if (wal_file_path == NULL) { - elog(INFO, "Required parameter is not specified: --wal_file_path %%p " + elog(INFO, "Optional parameter is not specified: --wal_file_path %%p " "Setting wal-file-path by default"); wal_file_path = xlog_wal_path; } - if (strcmp(wal_file_path, xlog_wal_path)!=0) + if (strcmp(wal_file_path, xlog_wal_path) != 0) { - elog(INFO, "wal_file_path is setted by user %s", wal_file_path); + elog(INFO, "wal_file_path is setted by user %s", wal_file_path); join_path_components(pg_xlog_dir, instance->pgdata, XLOGDIR); } @@ -178,7 +177,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa if (system_id != instance->system_identifier) elog(ERROR, "Refuse to push WAL segment %s into archive. Instance parameters mismatch." "Instance '%s' should have SYSTEM_ID = " UINT64_FORMAT " instead of " UINT64_FORMAT, - wal_file_name, instance->name, instance->system_identifier, system_id); + wal_file_name, instanceState->instance_name, instance->system_identifier, system_id); if (instance->compress_alg == PGLZ_COMPRESS) elog(ERROR, "Cannot use pglz for WAL compression"); @@ -927,30 +926,30 @@ get_gz_error(gzFile gzf, int errnum) } #endif -// /* Copy file attributes */ -// static void -// copy_file_attributes(const char *from_path, fio_location from_location, -// const char *to_path, fio_location to_location, -// bool unlink_on_error) -// { -// struct stat st; - -// if (fio_stat(from_path, &st, true, from_location) == -1) -// { -// if (unlink_on_error) -// fio_unlink(to_path, to_location); -// elog(ERROR, "Cannot stat file \"%s\": %s", -// from_path, strerror(errno)); -// } - -// if (fio_chmod(to_path, st.st_mode, to_location) == -1) -// { -// if (unlink_on_error) -// fio_unlink(to_path, to_location); -// elog(ERROR, "Cannot change mode of file \"%s\": %s", -// to_path, strerror(errno)); -// } -// } +/* Copy file attributes */ +//static void +//copy_file_attributes(const char *from_path, fio_location from_location, +// const char *to_path, fio_location to_location, +// bool unlink_on_error) +//{ +// struct stat st; +// +// if (fio_stat(from_path, &st, true, from_location) == -1) +// { +// if (unlink_on_error) +// fio_unlink(to_path, to_location); +// elog(ERROR, "Cannot stat file \"%s\": %s", +// from_path, strerror(errno)); +// } +// +// if (fio_chmod(to_path, st.st_mode, to_location) == -1) +// { +// if (unlink_on_error) +// fio_unlink(to_path, to_location); +// elog(ERROR, "Cannot change mode of file \"%s\": %s", +// to_path, strerror(errno)); +// } +//} /* Look for files with '.ready' suffix in archive_status directory * and pack such files into batch sized array. diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 74233d517..6a1feb014 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -91,7 +91,6 @@ extern const char *PROGRAM_EMAIL; #define DEFAULT_PERMANENT_SLOT_NAME "pg_probackup_perm_slot"; /* Timeout defaults */ -#define PARTIAL_WAL_TIMER 60 #define ARCHIVE_TIMEOUT_DEFAULT 300 #define REPLICA_TIMEOUT_DEFAULT 300 #define LOCK_TIMEOUT 60 From f601238d872d6a7e5499e3ce12e02f7c9c9b9ffd Mon Sep 17 00:00:00 2001 From: "Mikhail A. Kulagin" Date: Mon, 15 Nov 2021 11:53:47 +0300 Subject: [PATCH 12/22] [PGPRO-5421] modify test_wal_file_path_3 test to prevent pushing wal files from pgdata --- src/archive.c | 4 +++- tests/archive.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/archive.c b/src/archive.c index 2082cced1..cc639bd7a 100644 --- a/src/archive.c +++ b/src/archive.c @@ -119,9 +119,11 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa { uint64 i; char current_dir[MAXPGPATH]; + /* directory with wal files to be archived (usually instance pgdata/pg_wal) */ char pg_xlog_dir[MAXPGPATH]; - char xlog_wal_path[MAXPGPATH]; + /* usually instance pgdata/pg_wal/archive_status */ char archive_status_dir[MAXPGPATH]; + char xlog_wal_path[MAXPGPATH]; uint64 system_id; bool is_compress = false; diff --git a/tests/archive.py b/tests/archive.py index f22182d0c..3c1124bbe 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1837,7 +1837,6 @@ def test_wal_file_path(self): """ fname = self.id().split('.')[3] backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') - self.set_archiving node = self.make_simple_node( base_dir=os.path.join(module_name, fname, 'node'), set_replication=True, @@ -1920,7 +1919,8 @@ def test_wal_file_path_3(self): set_replication=True, initdb_params=['--data-checksums'], pg_options={ - 'archive_mode': 'on'}) + 'archive_mode': 'on', + 'wal_keep_size' : '0'}) self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) From 5bc3fb25cb8a101bc8a11288e6678cdfc8ca6858 Mon Sep 17 00:00:00 2001 From: "Mikhail A. Kulagin" Date: Mon, 15 Nov 2021 18:28:50 +0300 Subject: [PATCH 13/22] [PGPRO-5421] fix test_wal_file_path_3 portability --- tests/archive.py | 30 +++++++++------ tests/helpers/ptrack_helpers.py | 65 +++++++++++++++++---------------- 2 files changed, 52 insertions(+), 43 deletions(-) diff --git a/tests/archive.py b/tests/archive.py index 3c1124bbe..fb6bd2989 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1914,24 +1914,30 @@ def test_wal_file_path_3(self): """ fname = self.id().split('.')[3] backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + node = self.make_simple_node( base_dir=os.path.join(module_name, fname, 'node'), - set_replication=True, - initdb_params=['--data-checksums'], - pg_options={ - 'archive_mode': 'on', - 'wal_keep_size' : '0'}) + initdb_params=['--data-checksums']) + + node_pg_options = {} + if node.major_version >= 13: + node_pg_options['wal_keep_size'] = '0MB' + else: + node_pg_options['wal_keep_segments'] = '0' + self.set_auto_conf(node, node_pg_options) self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) - self.set_archiving(backup_dir, 'node', node, compress=True) - wal_dir = os.path.join(self.tmp_path, module_name, fname, 'wal_dir') - os.mkdir(wal_dir) - self.set_config( - backup_dir, 'node', - options=['--log-level-file=VERBOSE']) - self.set_auto_conf(node, {'archive_command': "cp -v %p {0}/%f".format(wal_dir)}) + wal_dir = os.path.join(self.tmp_path, module_name, fname, 'intermediate_dir') + shutil.rmtree(wal_dir, ignore_errors=True) + os.makedirs(wal_dir) + if os.name == 'posix': + self.set_archiving(backup_dir, 'node', node, custom_archive_command='cp -v %p {0}/%f'.format(wal_dir)) + elif os.name == 'nt': + self.set_archiving(backup_dir, 'node', node, custom_archive_command='xcopy /F "%p" "{0}/%f"'.format(wal_dir.replace("\\","\\\\"))) + else: + self.assertTrue(False, 'Unexpected os family') node.slow_start() node.safe_psql( diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py index 1b54d3165..3b14b7170 100644 --- a/tests/helpers/ptrack_helpers.py +++ b/tests/helpers/ptrack_helpers.py @@ -1296,7 +1296,8 @@ def get_recovery_conf(self, node): def set_archiving( self, backup_dir, instance, node, replica=False, overwrite=False, compress=True, old_binary=False, - log_level=False, archive_timeout=False): + log_level=False, archive_timeout=False, + custom_archive_command=None): # parse postgresql.auto.conf options = {} @@ -1306,45 +1307,47 @@ def set_archiving( else: options['archive_mode'] = 'on' - if os.name == 'posix': - options['archive_command'] = '"{0}" archive-push -B {1} --instance={2} '.format( - self.probackup_path, backup_dir, instance) - - elif os.name == 'nt': - options['archive_command'] = '"{0}" archive-push -B {1} --instance={2} '.format( - self.probackup_path.replace("\\","\\\\"), - backup_dir.replace("\\","\\\\"), instance) + if custom_archive_command is None: + if os.name == 'posix': + options['archive_command'] = '"{0}" archive-push -B {1} --instance={2} '.format( + self.probackup_path, backup_dir, instance) - # don`t forget to kill old_binary after remote ssh release - if self.remote and not old_binary: - options['archive_command'] += '--remote-proto=ssh ' - options['archive_command'] += '--remote-host=localhost ' + elif os.name == 'nt': + options['archive_command'] = '"{0}" archive-push -B {1} --instance={2} '.format( + self.probackup_path.replace("\\","\\\\"), + backup_dir.replace("\\","\\\\"), instance) - if self.archive_compress and compress: - options['archive_command'] += '--compress ' + # don`t forget to kill old_binary after remote ssh release + if self.remote and not old_binary: + options['archive_command'] += '--remote-proto=ssh ' + options['archive_command'] += '--remote-host=localhost ' - if overwrite: - options['archive_command'] += '--overwrite ' + if self.archive_compress and compress: + options['archive_command'] += '--compress ' - options['archive_command'] += '--log-level-console=VERBOSE ' - options['archive_command'] += '-j 5 ' - options['archive_command'] += '--batch-size 10 ' - options['archive_command'] += '--no-sync ' + if overwrite: + options['archive_command'] += '--overwrite ' - if archive_timeout: - options['archive_command'] += '--archive-timeout={0} '.format( - archive_timeout) + options['archive_command'] += '--log-level-console=VERBOSE ' + options['archive_command'] += '-j 5 ' + options['archive_command'] += '--batch-size 10 ' + options['archive_command'] += '--no-sync ' - if os.name == 'posix': - options['archive_command'] += '--wal-file-path=%p --wal-file-name=%f' + if archive_timeout: + options['archive_command'] += '--archive-timeout={0} '.format( + archive_timeout) - elif os.name == 'nt': - options['archive_command'] += '--wal-file-path="%p" --wal-file-name="%f"' + if os.name == 'posix': + options['archive_command'] += '--wal-file-path=%p --wal-file-name=%f' - if log_level: - options['archive_command'] += ' --log-level-console={0}'.format(log_level) - options['archive_command'] += ' --log-level-file={0} '.format(log_level) + elif os.name == 'nt': + options['archive_command'] += '--wal-file-path="%p" --wal-file-name="%f"' + if log_level: + options['archive_command'] += ' --log-level-console={0}'.format(log_level) + options['archive_command'] += ' --log-level-file={0} '.format(log_level) + else: # custom_archive_command is not None + options['archive_command'] = custom_archive_command self.set_auto_conf(node, options) From a9f3e806cc9647d3ea983753e0ce243a847acfc1 Mon Sep 17 00:00:00 2001 From: "Mikhail A. Kulagin" Date: Tue, 16 Nov 2021 02:36:19 +0300 Subject: [PATCH 14/22] [PGPRO-5421] rewrite --wal-file-path logic and tests --- .travis.yml | 1 - src/archive.c | 75 ++++++---------------------- src/pg_probackup.c | 88 +++++++++++++++++++++++++++++++-- src/pg_probackup.h | 2 +- src/utils/file.c | 27 ++++++++++ src/utils/file.h | 1 + src/utils/pgut.c | 16 ++++++ src/utils/pgut.h | 1 + tests/archive.py | 119 +++++++++++++++++++-------------------------- 9 files changed, 194 insertions(+), 136 deletions(-) diff --git a/.travis.yml b/.travis.yml index f1c0cef59..62ed559c2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -46,7 +46,6 @@ env: # - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=REL_13_STABLE MODE=replica # - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=off MODE=retention # - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=REL_13_STABLE MODE=restore - - PG_VERSION=15 PG_BRANCH=master - PG_VERSION=14 PG_BRANCH=REL_14_STABLE MODE=archive - PG_VERSION=13 PG_BRANCH=REL_13_STABLE MODE=archive - PG_VERSION=12 PG_BRANCH=REL_12_STABLE MODE=archive diff --git a/src/archive.c b/src/archive.c index cc639bd7a..0f32d9345 100644 --- a/src/archive.c +++ b/src/archive.c @@ -3,7 +3,7 @@ * archive.c: - pg_probackup specific archive commands for archive backups. * * - * Portions Copyright (c) 2018-2019, Postgres Professional + * Portions Copyright (c) 2018-2021, Postgres Professional * *------------------------------------------------------------------------- */ @@ -113,18 +113,13 @@ static parray *setup_push_filelist(const char *archive_status_dir, * Where archlog_path is $BACKUP_PATH/wal/instance_name */ void -do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wal_file_path, +do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *pg_xlog_dir, char *wal_file_name, int batch_size, bool overwrite, bool no_sync, bool no_ready_rename) { uint64 i; - char current_dir[MAXPGPATH]; - /* directory with wal files to be archived (usually instance pgdata/pg_wal) */ - char pg_xlog_dir[MAXPGPATH]; - /* usually instance pgdata/pg_wal/archive_status */ - char archive_status_dir[MAXPGPATH]; - char xlog_wal_path[MAXPGPATH]; - uint64 system_id; + /* usually instance pgdata/pg_wal/archive_status, empty if no_ready_rename or batch_size == 1 */ + char archive_status_dir[MAXPGPATH] = ""; bool is_compress = false; /* arrays with meta info for multi threaded backup */ @@ -144,50 +139,8 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa parray *batch_files = NULL; int n_threads; - if (wal_file_name == NULL) - elog(ERROR, "Required parameter is not specified: --wal-file-name %%f"); - - join_path_components(xlog_wal_path, PG_XLOG_DIR, wal_file_name); - - if (wal_file_path == NULL) - { - elog(INFO, "Optional parameter is not specified: --wal_file_path %%p " - "Setting wal-file-path by default"); - wal_file_path = xlog_wal_path; - } - - if (strcmp(wal_file_path, xlog_wal_path) != 0) - { - elog(INFO, "wal_file_path is setted by user %s", wal_file_path); - - join_path_components(pg_xlog_dir, instance->pgdata, XLOGDIR); - } - else - { - /* Create 'archlog_path' directory. Do nothing if it already exists. */ - //fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST); - - if (!getcwd(current_dir, sizeof(current_dir))) - elog(ERROR, "getcwd() error"); - - /* verify that archive-push --instance parameter is valid */ - system_id = get_system_identifier(current_dir, FIO_DB_HOST); - - if (instance->pgdata == NULL) - elog(ERROR, "Cannot read pg_probackup.conf for this instance"); - - if (system_id != instance->system_identifier) - elog(ERROR, "Refuse to push WAL segment %s into archive. Instance parameters mismatch." - "Instance '%s' should have SYSTEM_ID = " UINT64_FORMAT " instead of " UINT64_FORMAT, - wal_file_name, instanceState->instance_name, instance->system_identifier, system_id); - - if (instance->compress_alg == PGLZ_COMPRESS) - elog(ERROR, "Cannot use pglz for WAL compression"); - - join_path_components(pg_xlog_dir, current_dir, XLOGDIR); - } - - join_path_components(archive_status_dir, pg_xlog_dir, "archive_status"); + if (!no_ready_rename || batch_size > 1) + join_path_components(archive_status_dir, pg_xlog_dir, "archive_status"); #ifdef HAVE_LIBZ if (instance->compress_alg == ZLIB_COMPRESS) @@ -226,12 +179,13 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa { int rc; WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i); + bool first_wal = strcmp(xlogfile->name, wal_file_name) == 0; - rc = push_file(xlogfile, archive_status_dir, + rc = push_file(xlogfile, first_wal ? NULL : archive_status_dir, pg_xlog_dir, instanceState->instance_wal_subdir_path, overwrite, no_sync, instance->archive_timeout, - no_ready_rename || (strcmp(xlogfile->name, wal_file_name) == 0) ? true : false, + no_ready_rename || first_wal, is_compress && IsXLogFileName(xlogfile->name) ? true : false, instance->compress_level); if (rc == 0) @@ -255,7 +209,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa arg->first_filename = wal_file_name; arg->archive_dir = instanceState->instance_wal_subdir_path; arg->pg_xlog_dir = pg_xlog_dir; - arg->archive_status_dir = archive_status_dir; + arg->archive_status_dir = (!no_ready_rename || batch_size > 1) ? archive_status_dir : NULL; arg->overwrite = overwrite; arg->compress = is_compress; arg->no_sync = no_sync; @@ -298,7 +252,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa /* Note, that we are leaking memory here, * because pushing into archive is a very - * time-sensetive operation, so we skip freeing stuff. + * time-sensitive operation, so we skip freeing stuff. */ push_done: @@ -378,9 +332,6 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir, int compress_level) { int rc; - char wal_file_dummy[MAXPGPATH]; - - join_path_components(wal_file_dummy, archive_status_dir, xlogfile->name); elog(LOG, "pushing file \"%s\"", xlogfile->name); @@ -397,11 +348,13 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir, #endif /* take '--no-ready-rename' flag into account */ - if (!no_ready_rename) + if (!no_ready_rename && archive_status_dir != NULL) { + char wal_file_dummy[MAXPGPATH]; char wal_file_ready[MAXPGPATH]; char wal_file_done[MAXPGPATH]; + join_path_components(wal_file_dummy, archive_status_dir, xlogfile->name); snprintf(wal_file_ready, MAXPGPATH, "%s.%s", wal_file_dummy, "ready"); snprintf(wal_file_done, MAXPGPATH, "%s.%s", wal_file_dummy, "done"); diff --git a/src/pg_probackup.c b/src/pg_probackup.c index d629d838d..ad5edf1ea 100644 --- a/src/pg_probackup.c +++ b/src/pg_probackup.c @@ -35,7 +35,7 @@ * which includes info about pgdata directory and connection. * * Portions Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION - * Portions Copyright (c) 2015-2019, Postgres Professional + * Portions Copyright (c) 2015-2021, Postgres Professional * *------------------------------------------------------------------------- */ @@ -151,6 +151,7 @@ static char *wal_file_path; static char *wal_file_name; static bool file_overwrite = false; static bool no_ready_rename = false; +static char archive_push_xlog_dir[MAXPGPATH] = ""; /* archive get options */ static char *prefetch_dir; @@ -788,7 +789,7 @@ main(int argc, char *argv[]) current.stream = stream_wal = true; if (instance_config.external_dir_str) elog(ERROR, "external directories not supported fom \"%s\" command", get_subcmd_name(backup_subcmd)); - // TODO проверить instance_config.conn_opt + // TODO check instance_config.conn_opt } /* sanity */ @@ -796,6 +797,87 @@ main(int argc, char *argv[]) elog(ERROR, "You cannot specify \"--no-validate\" option with the \"%s\" command", get_subcmd_name(backup_subcmd)); + if (backup_subcmd == ARCHIVE_PUSH_CMD) + { + /* Check archive-push parameters and construct archive_push_xlog_dir + * + * There are 3 cases: + * 1. no --wal-file-path specified -- use cwd, ./PG_XLOG_DIR for wal files + * (and ./PG_XLOG_DIR/archive_status for .done files inside do_archive_push()) + * in this case we can use batches and threads + * 2. --wal-file-path is specified and it is the same dir as stored in pg_probackup.conf (instance_config.pgdata) + * in this case we can use this path, as well as batches and thread + * 3. --wal-file-path is specified and it is different from instance_config.pgdata + * disable optimizations and work with user specified path + */ + bool check_system_id = true; + uint64 system_id; + + if (wal_file_name == NULL) + elog(ERROR, "Required parameter is not specified: --wal-file-name %%f"); + + if (instance_config.pgdata == NULL) + elog(ERROR, "Cannot read pg_probackup.conf for this instance"); + + /* TODO may be remove in preference of checking inside compress_init()? */ + if (instance_config.compress_alg == PGLZ_COMPRESS) + elog(ERROR, "Cannot use pglz for WAL compression"); + + if (wal_file_path == NULL) + { + /* 1st case */ + char current_dir[MAXPGPATH]; + if (!getcwd(current_dir, sizeof(current_dir))) + elog(ERROR, "getcwd() error"); + + system_id = get_system_identifier(current_dir, FIO_DB_HOST); + join_path_components(archive_push_xlog_dir, current_dir, XLOGDIR); + } + else + { + /* + * Usually we get something like + * wal_file_path = "pg_wal/0000000100000000000000A1" + * wal_file_name = "0000000100000000000000A1" + * instance_config.pgdata = "/pgdata/.../node/data" + * We need to strip wal_file_name from wal_file_path, add XLOGDIR to instance_config.pgdata + * and compare this directories. + * Note, that pg_wal can be symlink (see test_waldir_outside_pgdata_archiving) + */ + char *stripped_wal_file_path = pgut_str_strip_trailing_filename(wal_file_path, wal_file_name); + join_path_components(archive_push_xlog_dir, instance_config.pgdata, XLOGDIR); + if (fio_is_same_file(stripped_wal_file_path, archive_push_xlog_dir, true, FIO_DB_HOST)) + { + /* 2nd case */ + system_id = get_system_identifier(instance_config.pgdata, FIO_DB_HOST); + /* archive_push_xlog_dir already have right value */ + } + else + { + /* 3rd case */ + check_system_id = false; + if (strlen(stripped_wal_file_path) < MAXPGPATH) + strncpy(archive_push_xlog_dir, stripped_wal_file_path, MAXPGPATH); + else + elog(ERROR, "Value specified to --wal_file_path is too long"); + + if (batch_size > 1 || num_threads > 1 || !no_ready_rename) + { + elog(WARNING, "Supplied --wal_file_path is outside pgdata, force safe values for options: --batch-size=1 -j 1 --no-ready-rename"); + batch_size = 1; + num_threads = 1; + no_ready_rename = true; + } + } + pfree(stripped_wal_file_path); + } + + if (check_system_id && system_id != instance_config.system_identifier) + elog(ERROR, "Refuse to push WAL segment %s into archive. Instance parameters mismatch." + "Instance '%s' should have SYSTEM_ID = " UINT64_FORMAT " instead of " UINT64_FORMAT, + wal_file_name, instanceState->instance_name, instance_config.system_identifier, system_id); + } + #if PG_VERSION_NUM >= 100000 if (temp_slot && perm_slot) elog(ERROR, "You cannot specify \"--perm-slot\" option with the \"--temp-slot\" option"); @@ -819,7 +901,7 @@ main(int argc, char *argv[]) switch (backup_subcmd) { case ARCHIVE_PUSH_CMD: - do_archive_push(instanceState, &instance_config, wal_file_path, wal_file_name, + do_archive_push(instanceState, &instance_config, archive_push_xlog_dir, wal_file_name, batch_size, file_overwrite, no_sync, no_ready_rename); break; case ARCHIVE_GET_CMD: diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 6a1feb014..4e30eb477 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -889,7 +889,7 @@ extern int do_init(CatalogState *catalogState); extern int do_add_instance(InstanceState *instanceState, InstanceConfig *instance); /* in archive.c */ -extern void do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wal_file_path, +extern void do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *pg_xlog_dir, char *wal_file_name, int batch_size, bool overwrite, bool no_sync, bool no_ready_rename); extern void do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const char *prefetch_dir_arg, char *wal_file_path, diff --git a/src/utils/file.c b/src/utils/file.c index 810b4b394..7d1df554b 100644 --- a/src/utils/file.c +++ b/src/utils/file.c @@ -1141,6 +1141,33 @@ fio_stat(char const* path, struct stat* st, bool follow_symlink, fio_location lo } } +/* + * Compare, that filename1 and filename2 is the same file + * in windows compare only filenames + */ +bool +fio_is_same_file(char const* filename1, char const* filename2, bool follow_symlink, fio_location location) +{ +#ifndef WIN32 + struct stat stat1, stat2; + + if (fio_stat(filename1, &stat1, follow_symlink, location) < 0) + elog(ERROR, "Can't stat file \"%s\": %s", filename1, strerror(errno)); + + if (fio_stat(filename2, &stat2, follow_symlink, location) < 0) + elog(ERROR, "Can't stat file \"%s\": %s", filename2, strerror(errno)); + + return stat1.st_ino == stat2.st_ino && stat1.st_dev == stat2.st_dev; +#else + char *abs_name1 = make_absolute_path(filename1); + char *abs_name2 = make_absolute_path(filename2); + bool result = strcmp(abs_name1, abs_name2) == 0; + free(abs_name2); + free(abs_name1); + return result; +#endif +} + /* * Read value of a symbolic link * this is a wrapper about readlink() syscall diff --git a/src/utils/file.h b/src/utils/file.h index edb5ea0f9..a554b4ab0 100644 --- a/src/utils/file.h +++ b/src/utils/file.h @@ -129,6 +129,7 @@ extern int fio_mkdir(char const* path, int mode, fio_location location); extern int fio_chmod(char const* path, int mode, fio_location location); extern int fio_access(char const* path, int mode, fio_location location); extern int fio_stat(char const* path, struct stat* st, bool follow_symlinks, fio_location location); +extern bool fio_is_same_file(char const* filename1, char const* filename2, bool follow_symlink, fio_location location); extern ssize_t fio_readlink(const char *path, char *value, size_t valsiz, fio_location location); extern DIR* fio_opendir(char const* path, fio_location location); extern struct dirent * fio_readdir(DIR *dirp); diff --git a/src/utils/pgut.c b/src/utils/pgut.c index 52599848d..2cf0ccbe7 100644 --- a/src/utils/pgut.c +++ b/src/utils/pgut.c @@ -977,6 +977,22 @@ pgut_strndup(const char *str, size_t n) return ret; } +/* + * Allocates new string, that contains part of filepath string minus trailing filename string + * If trailing filename string not found, returns copy of filepath. + * Result must be free by caller. + */ +char * +pgut_str_strip_trailing_filename(const char *filepath, const char *filename) +{ + size_t fp_len = strlen(filepath); + size_t fn_len = strlen(filename); + if (strncmp(filepath + fp_len - fn_len, filename, fn_len) == 0) + return pgut_strndup(filepath, fp_len - fn_len); + else + return pgut_strndup(filepath, fp_len); +} + FILE * pgut_fopen(const char *path, const char *mode, bool missing_ok) { diff --git a/src/utils/pgut.h b/src/utils/pgut.h index a1d7b5a93..fa0efe816 100644 --- a/src/utils/pgut.h +++ b/src/utils/pgut.h @@ -63,6 +63,7 @@ extern void *pgut_malloc0(size_t size); extern void *pgut_realloc(void *p, size_t size); extern char *pgut_strdup(const char *str); extern char *pgut_strndup(const char *str, size_t n); +extern char *pgut_str_strip_trailing_filename(const char *filepath, const char *filename); #define pgut_new(type) ((type *) pgut_malloc(sizeof(type))) #define pgut_new0(type) ((type *) pgut_malloc0(sizeof(type))) diff --git a/tests/archive.py b/tests/archive.py index fb6bd2989..02bb4aef1 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1830,7 +1830,7 @@ def test_archive_options_1(self): # @unittest.skip("skip") # @unittest.expectedFailure - def test_wal_file_path(self): + def test_undefined_wal_file_path(self): """ check that archive-push works correct with undefined --wal-file-path @@ -1840,75 +1840,32 @@ def test_wal_file_path(self): node = self.make_simple_node( base_dir=os.path.join(module_name, fname, 'node'), set_replication=True, - initdb_params=['--data-checksums'], - pg_options={ - 'archive_mode': 'on'}) + initdb_params=['--data-checksums']) self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) - self.set_archiving(backup_dir, 'node', node, compress=True) + self.set_archiving(backup_dir, 'node', node) archive_command = '\"{0}\" archive-push -B \"{1}\" --instance \"{2}\" --wal-file-name=%f'.format( self.probackup_path, backup_dir, 'node') self.set_auto_conf( node, {'archive_command': archive_command}) - node.slow_start() - - # FULL - self.backup_node(backup_dir, 'node', node) - - log_file = os.path.join(node.logs_dir, 'postgresql.log') - with open(log_file, 'r') as f: - log_content = f.read() - - self.assertIn('pg_probackup archive-push completed successfully', log_content) - - self.del_test_dir(module_name, fname) - - # @unittest.skip("skip") - # @unittest.expectedFailure - def test_wal_file_path_2(self): - """ - check that archive-push works correct with --wal-file-path=%p as usual - """ - fname = self.id().split('.')[3] - backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') - node = self.make_simple_node( - base_dir=os.path.join(module_name, fname, 'node'), - set_replication=True, - initdb_params=['--data-checksums'], - pg_options={ - 'archive_mode': 'on'}) - - self.init_pb(backup_dir) - self.add_instance(backup_dir, 'node', node) - self.set_archiving(backup_dir, 'node', node, compress=True) - archive_command = '\"{0}\" archive-push -B \"{1}\" --instance \"{2}\" --wal-file-path=%p --wal-file-name=%f'.format( - self.probackup_path, backup_dir, 'node') - self.set_auto_conf( - node, - {'archive_command': archive_command}) - node.slow_start() node.safe_psql( "postgres", "create table t_heap as select i" - " as id from generate_series(0,100) i") - - # FULL - self.backup_node(backup_dir, 'node', node, options=['--stream']) + " as id from generate_series(0, 10) i") + self.switch_wal_segment(node) - log_file = os.path.join(node.logs_dir, 'postgresql.log') - with open(log_file, 'r') as f: - log_content = f.read() + # check + self.assertEqual(self.show_archive(backup_dir, instance='node', tli=1)['min-segno'], '000000010000000000000001') - self.assertIn('pg_probackup archive-push completed successfully', log_content) self.del_test_dir(module_name, fname) # @unittest.skip("skip") # @unittest.expectedFailure - def test_wal_file_path_3(self): + def test_intermediate_archiving(self): """ check that archive-push works correct with --wal-file-path setting by user """ @@ -1935,7 +1892,7 @@ def test_wal_file_path_3(self): if os.name == 'posix': self.set_archiving(backup_dir, 'node', node, custom_archive_command='cp -v %p {0}/%f'.format(wal_dir)) elif os.name == 'nt': - self.set_archiving(backup_dir, 'node', node, custom_archive_command='xcopy /F "%p" "{0}/%f"'.format(wal_dir.replace("\\","\\\\"))) + self.set_archiving(backup_dir, 'node', node, custom_archive_command='xcopy /F "%p" "{0}\\%f"'.format(wal_dir.replace("\\","\\\\"))) else: self.assertTrue(False, 'Unexpected os family') @@ -1943,30 +1900,52 @@ def test_wal_file_path_3(self): node.safe_psql( "postgres", "create table t_heap as select i" - " as id from generate_series(0,1000) i") - - # FULL - self.backup_node(backup_dir, 'node', node, options=['--stream']) - - node.safe_psql( - "postgres", - "insert into t_heap select i" - " as id from generate_series(0,1000) i") - self.backup_node(backup_dir, 'node', node, - options=['--stream']) + " as id from generate_series(0, 10) i") + self.switch_wal_segment(node) - filename = '000000010000000000000001' + wal_segment = '000000010000000000000001' self.run_pb(["archive-push", "-B", backup_dir, "--instance=node", "-D", node.data_dir, - "--wal-file-path", wal_dir, "--wal-file-name", filename]) + "--wal-file-path", "{0}/{1}".format(wal_dir, wal_segment), "--wal-file-name", wal_segment]) - log_file = os.path.join( - backup_dir, 'log', 'pg_probackup.log') - with open(log_file, 'r') as f: - log_content = f.read() + self.assertEqual(self.show_archive(backup_dir, instance='node', tli=1)['min-segno'], wal_segment) + + self.del_test_dir(module_name, fname) + + # @unittest.skip("skip") + # @unittest.expectedFailure + def test_waldir_outside_pgdata_archiving(self): + """ + check that archive-push works correct with symlinked waldir + """ + fname = self.id().split('.')[3] + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + external_wal_dir = os.path.join(self.tmp_path, module_name, fname, 'ext_wal_dir') + shutil.rmtree(external_wal_dir, ignore_errors=True) + + node = self.make_simple_node( + base_dir=os.path.join(module_name, fname, 'node'), + initdb_params=['--data-checksums', '--waldir={0}'.format(external_wal_dir)]) + + if self.get_version(node) < self.version_to_num('10.0'): + self.del_test_dir(module_name, fname) + return unittest.skip( + 'Skipped because waldir outside pgdata is supported since PG 10') + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + self.set_archiving(backup_dir, 'node', node) + + node.slow_start() + node.safe_psql( + "postgres", + "create table t_heap as select i" + " as id from generate_series(0, 10) i") + self.switch_wal_segment(node) - self.assertIn('pg_probackup archive-push completed successfully', log_content) + # check + self.assertEqual(self.show_archive(backup_dir, instance='node', tli=1)['min-segno'], '000000010000000000000001') self.del_test_dir(module_name, fname) From 3529a174e6e0824684f9151f687d0fc5bac1d7c0 Mon Sep 17 00:00:00 2001 From: "Mikhail A. Kulagin" Date: Tue, 16 Nov 2021 04:57:08 +0300 Subject: [PATCH 15/22] [PGPRO-5421] fix test_waldir_outside_pgdata_archiving for 9.6 --- tests/archive.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/archive.py b/tests/archive.py index 02bb4aef1..2cbc80510 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1919,6 +1919,10 @@ def test_waldir_outside_pgdata_archiving(self): """ check that archive-push works correct with symlinked waldir """ + if self.pg_config_version < self.version_to_num('10.0'): + return unittest.skip( + 'Skipped because waldir outside pgdata is supported since PG 10') + fname = self.id().split('.')[3] backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') external_wal_dir = os.path.join(self.tmp_path, module_name, fname, 'ext_wal_dir') @@ -1928,11 +1932,6 @@ def test_waldir_outside_pgdata_archiving(self): base_dir=os.path.join(module_name, fname, 'node'), initdb_params=['--data-checksums', '--waldir={0}'.format(external_wal_dir)]) - if self.get_version(node) < self.version_to_num('10.0'): - self.del_test_dir(module_name, fname) - return unittest.skip( - 'Skipped because waldir outside pgdata is supported since PG 10') - self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) self.set_archiving(backup_dir, 'node', node) From bbdf5cadad62786d4acd5af2313e90dbee01b869 Mon Sep 17 00:00:00 2001 From: "Mikhail A. Kulagin" Date: Thu, 18 Nov 2021 06:36:37 +0300 Subject: [PATCH 16/22] [PGPRO-5421] fix for test test_archive_push_sanity --- src/backup.c | 2 +- src/catchup.c | 6 +++--- src/init.c | 2 +- src/pg_probackup.c | 40 +++++++++++++++++++++++++--------------- src/pg_probackup.h | 2 +- src/restore.c | 2 +- src/util.c | 6 +++--- 7 files changed, 35 insertions(+), 25 deletions(-) diff --git a/src/backup.c b/src/backup.c index 1d08c3828..c575865c4 100644 --- a/src/backup.c +++ b/src/backup.c @@ -943,7 +943,7 @@ check_system_identifiers(PGconn *conn, const char *pgdata) uint64 system_id_conn; uint64 system_id_pgdata; - system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST); + system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST, false); system_id_conn = get_remote_system_identifier(conn); /* for checkdb check only system_id_pgdata and system_id_conn */ diff --git a/src/catchup.c b/src/catchup.c index 5a0c8e45a..f9145a395 100644 --- a/src/catchup.c +++ b/src/catchup.c @@ -48,7 +48,7 @@ catchup_init_state(PGNodeInfo *source_node_info, const char *source_pgdata, cons /* Get WAL segments size and system ID of source PG instance */ instance_config.xlog_seg_size = get_xlog_seg_size(source_pgdata); - instance_config.system_identifier = get_system_identifier(source_pgdata, FIO_DB_HOST); + instance_config.system_identifier = get_system_identifier(source_pgdata, FIO_DB_HOST, false); current.start_time = time(NULL); strlcpy(current.program_version, PROGRAM_VERSION, sizeof(current.program_version)); @@ -163,7 +163,7 @@ catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn, uint64 source_conn_id, source_id, dest_id; source_conn_id = get_remote_system_identifier(source_conn); - source_id = get_system_identifier(source_pgdata, FIO_DB_HOST); /* same as instance_config.system_identifier */ + source_id = get_system_identifier(source_pgdata, FIO_DB_HOST, false); /* same as instance_config.system_identifier */ if (source_conn_id != source_id) elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu", @@ -171,7 +171,7 @@ catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn, if (current.backup_mode != BACKUP_MODE_FULL) { - dest_id = get_system_identifier(dest_pgdata, FIO_LOCAL_HOST); + dest_id = get_system_identifier(dest_pgdata, FIO_LOCAL_HOST, false); if (source_conn_id != dest_id) elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu", source_conn_id, dest_pgdata, dest_id); diff --git a/src/init.c b/src/init.c index a4911cb5c..8773016b5 100644 --- a/src/init.c +++ b/src/init.c @@ -57,7 +57,7 @@ do_add_instance(InstanceState *instanceState, InstanceConfig *instance) "(-D, --pgdata)"); /* Read system_identifier from PGDATA */ - instance->system_identifier = get_system_identifier(instance->pgdata, FIO_DB_HOST); + instance->system_identifier = get_system_identifier(instance->pgdata, FIO_DB_HOST, false); /* Starting from PostgreSQL 11 read WAL segment size from PGDATA */ instance->xlog_seg_size = get_xlog_seg_size(instance->pgdata); diff --git a/src/pg_probackup.c b/src/pg_probackup.c index ad5edf1ea..49e226ace 100644 --- a/src/pg_probackup.c +++ b/src/pg_probackup.c @@ -801,17 +801,21 @@ main(int argc, char *argv[]) { /* Check archive-push parameters and construct archive_push_xlog_dir * - * There are 3 cases: + * There are 4 cases: * 1. no --wal-file-path specified -- use cwd, ./PG_XLOG_DIR for wal files * (and ./PG_XLOG_DIR/archive_status for .done files inside do_archive_push()) * in this case we can use batches and threads * 2. --wal-file-path is specified and it is the same dir as stored in pg_probackup.conf (instance_config.pgdata) * in this case we can use this path, as well as batches and thread - * 3. --wal-file-path is specified and it is different from instance_config.pgdata + * 3. --wal-file-path is specified and it isn't same dir as stored in pg_probackup.conf but control file present with correct system_id + * in this case we can use this path, as well as batches and thread + * (replica for example, see test_archive_push_sanity) + * 4. --wal-file-path is specified and it is different from instance_config.pgdata and no control file found * disable optimizations and work with user specified path */ bool check_system_id = true; uint64 system_id; + char current_dir[MAXPGPATH]; if (wal_file_name == NULL) elog(ERROR, "Required parameter is not specified: --wal-file-name %%f"); @@ -823,14 +827,13 @@ main(int argc, char *argv[]) if (instance_config.compress_alg == PGLZ_COMPRESS) elog(ERROR, "Cannot use pglz for WAL compression"); + if (!getcwd(current_dir, sizeof(current_dir))) + elog(ERROR, "getcwd() error"); + if (wal_file_path == NULL) { /* 1st case */ - char current_dir[MAXPGPATH]; - if (!getcwd(current_dir, sizeof(current_dir))) - elog(ERROR, "getcwd() error"); - - system_id = get_system_identifier(current_dir, FIO_DB_HOST); + system_id = get_system_identifier(current_dir, FIO_DB_HOST, false); join_path_components(archive_push_xlog_dir, current_dir, XLOGDIR); } else @@ -849,24 +852,31 @@ main(int argc, char *argv[]) if (fio_is_same_file(stripped_wal_file_path, archive_push_xlog_dir, true, FIO_DB_HOST)) { /* 2nd case */ - system_id = get_system_identifier(instance_config.pgdata, FIO_DB_HOST); + system_id = get_system_identifier(instance_config.pgdata, FIO_DB_HOST, false); /* archive_push_xlog_dir already have right value */ } else { - /* 3rd case */ - check_system_id = false; if (strlen(stripped_wal_file_path) < MAXPGPATH) strncpy(archive_push_xlog_dir, stripped_wal_file_path, MAXPGPATH); else elog(ERROR, "Value specified to --wal_file_path is too long"); - if (batch_size > 1 || num_threads > 1 || !no_ready_rename) + system_id = get_system_identifier(current_dir, FIO_DB_HOST, true); + /* 3rd case if control file present -- i.e. system_id != 0 */ + + if (system_id == 0) { - elog(WARNING, "Supplied --wal_file_path is outside pgdata, force safe values for options: --batch-size=1 -j 1 --no-ready-rename"); - batch_size = 1; - num_threads = 1; - no_ready_rename = true; + /* 4th case */ + check_system_id = false; + + if (batch_size > 1 || num_threads > 1 || !no_ready_rename) + { + elog(WARNING, "Supplied --wal_file_path is outside pgdata, force safe values for options: --batch-size=1 -j 1 --no-ready-rename"); + batch_size = 1; + num_threads = 1; + no_ready_rename = true; + } } } pfree(stripped_wal_file_path); diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 4e30eb477..a51794d98 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -1153,7 +1153,7 @@ extern XLogRecPtr get_next_record_lsn(const char *archivedir, XLogSegNo segno, T extern TimeLineID get_current_timeline(PGconn *conn); extern TimeLineID get_current_timeline_from_control(const char *pgdata_path, fio_location location, bool safe); extern XLogRecPtr get_checkpoint_location(PGconn *conn); -extern uint64 get_system_identifier(const char *pgdata_path, fio_location location); +extern uint64 get_system_identifier(const char *pgdata_path, fio_location location, bool safe); extern uint64 get_remote_system_identifier(PGconn *conn); extern uint32 get_data_checksum_version(bool safe); extern pg_crc32c get_pgcontrol_checksum(const char *pgdata_path); diff --git a/src/restore.c b/src/restore.c index 005984aed..47e3b0344 100644 --- a/src/restore.c +++ b/src/restore.c @@ -2186,7 +2186,7 @@ check_incremental_compatibility(const char *pgdata, uint64 system_identifier, */ elog(INFO, "Trying to read pg_control file in destination directory"); - system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST); + system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST, false); if (system_id_pgdata == instance_config.system_identifier) system_id_match = true; diff --git a/src/util.c b/src/util.c index f39b31d45..fb33fd046 100644 --- a/src/util.c +++ b/src/util.c @@ -247,15 +247,15 @@ get_checkpoint_location(PGconn *conn) } uint64 -get_system_identifier(const char *pgdata_path, fio_location location) +get_system_identifier(const char *pgdata_path, fio_location location, bool safe) { ControlFileData ControlFile; char *buffer; size_t size; /* First fetch file... */ - buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, location); - if (buffer == NULL) + buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, safe, location); + if (safe && buffer == NULL) return 0; digestControlFile(&ControlFile, buffer, size); pg_free(buffer); From f73a5d9a782ebdc52d96853b9524ee2c5fec78b6 Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Thu, 18 Nov 2021 11:38:59 +0500 Subject: [PATCH 17/22] Fixes in src/help.c about the list keys of archive-push command --- src/help.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/help.c b/src/help.c index 1515359e4..d00435d79 100644 --- a/src/help.c +++ b/src/help.c @@ -227,6 +227,7 @@ help_pg_probackup(void) printf(_("\n %s archive-push -B backup-path --instance=instance_name\n"), PROGRAM_NAME); printf(_(" --wal-file-name=wal-file-name\n")); + printf(_(" [--wal-file-path=wal-file-path]\n")); printf(_(" [-j num-threads] [--batch-size=batch_size]\n")); printf(_(" [--archive-timeout=timeout]\n")); printf(_(" [--no-ready-rename] [--no-sync]\n")); @@ -937,6 +938,7 @@ help_archive_push(void) { printf(_("\n%s archive-push -B backup-path --instance=instance_name\n"), PROGRAM_NAME); printf(_(" --wal-file-name=wal-file-name\n")); + printf(_(" [--wal-file-path=wal-file-path]\n")); printf(_(" [-j num-threads] [--batch-size=batch_size]\n")); printf(_(" [--archive-timeout=timeout]\n")); printf(_(" [--no-ready-rename] [--no-sync]\n")); @@ -951,6 +953,8 @@ help_archive_push(void) printf(_(" --instance=instance_name name of the instance to delete\n")); printf(_(" --wal-file-name=wal-file-name\n")); printf(_(" name of the file to copy into WAL archive\n")); + printf(_(" --wal-file-path=wal-file-path\n")); + printf(_(" relative destination path of the WAL archive\n")); printf(_(" -j, --threads=NUM number of parallel threads\n")); printf(_(" --batch-size=NUM number of files to be copied\n")); printf(_(" --archive-timeout=timeout wait timeout before discarding stale temp file(default: 5min)\n")); From 0efb8e674d0104567aa26fb41845bd4f58edabf9 Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Thu, 18 Nov 2021 18:02:38 +0500 Subject: [PATCH 18/22] Fix in expected/option_help.out to pass tests correctly --- src/help.c | 2 +- tests/expected/option_help.out | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/help.c b/src/help.c index d00435d79..a6530fc0e 100644 --- a/src/help.c +++ b/src/help.c @@ -985,8 +985,8 @@ static void help_archive_get(void) { printf(_("\n%s archive-get -B backup-path --instance=instance_name\n"), PROGRAM_NAME); - printf(_(" --wal-file-path=wal-file-path\n")); printf(_(" --wal-file-name=wal-file-name\n")); + printf(_(" [--wal-file-path=wal-file-path]\n")); printf(_(" [-j num-threads] [--batch-size=batch_size]\n")); printf(_(" [--no-validate-wal]\n")); printf(_(" [--remote-proto] [--remote-host]\n")); diff --git a/tests/expected/option_help.out b/tests/expected/option_help.out index 01384a893..dd3c4e865 100644 --- a/tests/expected/option_help.out +++ b/tests/expected/option_help.out @@ -144,6 +144,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database. pg_probackup archive-push -B backup-path --instance=instance_name --wal-file-name=wal-file-name + [--wal-file-path=wal-file-path] [-j num-threads] [--batch-size=batch_size] [--archive-timeout=timeout] [--no-ready-rename] [--no-sync] From 4194d7ef667a675287496ef63beb349a8fea7a1f Mon Sep 17 00:00:00 2001 From: Daria Lepikhova Date: Fri, 19 Nov 2021 12:57:06 +0500 Subject: [PATCH 19/22] Added clarifications about --wal-file-path to doc/pgprobackup.xml --- doc/pgprobackup.xml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/doc/pgprobackup.xml b/doc/pgprobackup.xml index a347e7b43..40d07d219 100644 --- a/doc/pgprobackup.xml +++ b/doc/pgprobackup.xml @@ -131,6 +131,7 @@ doc/src/sgml/pgprobackup.sgml backup_dir instance_name + wal_file_path wal_file_name option @@ -5367,7 +5368,9 @@ pg_probackup catchup -b catchup_mode Provides the path to the WAL file in archive_command and restore_command. Use the %p - variable as the value for this option for correct processing. + variable as the value for this option or skip it for setting it according to + value from pg_probackup.conf. + Or explicitly specify the path to a file outside the PGDATA. @@ -5380,6 +5383,8 @@ pg_probackup catchup -b catchup_mode archive_command and restore_command. Use the %f variable as the value for this option for correct processing. + Or explicitly specify the filename if given a