Skip to content

Commit 3b87069

Browse files
committed
Fixes for --wal-file-path option
1 parent 8dd36c1 commit 3b87069

File tree

1 file changed

+157
-165
lines changed

1 file changed

+157
-165
lines changed

src/archive.c

Lines changed: 157 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,51 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
145145
parray *batch_files = NULL;
146146
int n_threads;
147147

148+
char xlog_wal_path[MAXPGPATH];
149+
148150
if (wal_file_name == NULL)
149-
elog(INFO, "Required parameter is not specified: --wal-file-name %%f");
151+
elog(ERROR, "Required parameter is not specified: --wal-file-name %%f");
152+
153+
join_path_components(xlog_wal_path, PG_XLOG_DIR, wal_file_name);
154+
155+
if (wal_file_path == NULL)
156+
{
157+
elog(INFO, "Required parameter is not specified: --wal_file_path %%p "
158+
"Setting wal-file-path by default");
159+
wal_file_path = xlog_wal_path;
160+
}
161+
162+
if (strcmp(wal_file_path, xlog_wal_path)!=0)
163+
{
164+
char backup_wal_file_path[MAXPGPATH];
165+
char absolute_wal_file_path[MAXPGPATH];
166+
167+
join_path_components(absolute_wal_file_path, current_dir, wal_file_path);
168+
join_path_components(backup_wal_file_path, instance->arclog_path, wal_file_name);
169+
170+
elog(INFO, "wal_file_path is setted by user %s", wal_file_path);
171+
if (instance->compress_alg == PGLZ_COMPRESS)
172+
elog(ERROR, "pglz compression is not supported");
173+
174+
#ifdef HAVE_LIBZ
175+
if (instance->compress_alg == ZLIB_COMPRESS)
176+
is_compress = IsXLogFileName(wal_file_name);
177+
#endif
178+
179+
push_wal_file(absolute_wal_file_path, backup_wal_file_path, is_compress,
180+
overwrite, instance->compress_level);
181+
elog(INFO, "pg_probackup archive-push completed successfully");
182+
183+
return;
184+
}
185+
186+
/* Create 'archlog_path' directory. Do nothing if it already exists. */
187+
//fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST);
188+
189+
#ifdef HAVE_LIBZ
190+
if (instance->compress_alg == ZLIB_COMPRESS)
191+
is_compress = true;
192+
#endif
150193

151194
if (!getcwd(current_dir, sizeof(current_dir)))
152195
elog(ERROR, "getcwd() error");
@@ -160,189 +203,139 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
160203
if (system_id != instance->system_identifier)
161204
elog(ERROR, "Refuse to push WAL segment %s into archive. Instance parameters mismatch."
162205
"Instance '%s' should have SYSTEM_ID = " UINT64_FORMAT " instead of " UINT64_FORMAT,
163-
wal_file_name, instance->name, instance->system_identifier, system_id);
206+
wal_file_name, instance->name, instance->system_identifier, system_id);
164207

165208
if (instance->compress_alg == PGLZ_COMPRESS)
166209
elog(ERROR, "Cannot use pglz for WAL compression");
167210

168-
/* */
169-
elog(INFO, "instance %s", instance->backup_instance_path);
170-
elog(INFO, "wal_file_path %s", wal_file_path);
171-
if ((instance->backup_instance_path == wal_file_path) || (wal_file_path == NULL))
211+
join_path_components(pg_xlog_dir, current_dir, XLOGDIR);
212+
join_path_components(archive_status_dir, pg_xlog_dir, "archive_status");
213+
214+
/* Setup filelist and locks */
215+
batch_files = setup_push_filelist(archive_status_dir, wal_file_name, batch_size);
216+
217+
n_threads = num_threads;
218+
if (num_threads > parray_num(batch_files))
219+
n_threads = parray_num(batch_files);
220+
221+
elog(INFO, "pg_probackup archive-push WAL file: %s, "
222+
"threads: %i/%i, batch: %lu/%i, compression: %s",
223+
wal_file_name, n_threads, num_threads,
224+
parray_num(batch_files), batch_size,
225+
is_compress ? "zlib" : "none");
226+
227+
num_threads = n_threads;
228+
229+
/* Single-thread push
230+
* We don`t want to start multi-thread push, if number of threads in equal to 1,
231+
* or the number of files ready to push is small.
232+
* Multithreading in remote mode isn`t cheap,
233+
* establishing ssh connection can take 100-200ms, so running and terminating
234+
* one thread using generic multithread approach can take
235+
* almost as much time as copying itself.
236+
* TODO: maybe we should be more conservative and force single thread
237+
* push if batch_files array is small.
238+
*/
239+
if (num_threads == 1 || (parray_num(batch_files) == 1))
172240
{
173-
if(wal_file_path == NULL)
174-
{
175-
elog(INFO, "Required parameter is not specified: --wal-file-path. Set by default");
176-
wal_file_path = instance->backup_instance_path;
177-
}
178-
179-
join_path_components(pg_xlog_dir, current_dir, XLOGDIR);
180-
join_path_components(archive_status_dir, pg_xlog_dir, "archive_status");
181-
182-
/* Create 'archlog_path' directory. Do nothing if it already exists. */
183-
//fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST);
184-
185-
#ifdef HAVE_LIBZ
186-
if (instance->compress_alg == ZLIB_COMPRESS)
187-
is_compress = true;
188-
#endif
189-
190-
/* Setup filelist and locks */
191-
batch_files = setup_push_filelist(archive_status_dir, wal_file_name, batch_size);
192-
193-
n_threads = num_threads;
194-
if (num_threads > parray_num(batch_files))
195-
n_threads = parray_num(batch_files);
196-
197-
elog(INFO, "pg_probackup archive-push WAL file: %s, "
198-
"threads: %i/%i, batch: %lu/%i, compression: %s",
199-
wal_file_name, n_threads, num_threads,
200-
parray_num(batch_files), batch_size,
201-
is_compress ? "zlib" : "none");
202-
203-
num_threads = n_threads;
204-
205-
/* Single-thread push
206-
* We don`t want to start multi-thread push, if number of threads in equal to 1,
207-
* or the number of files ready to push is small.
208-
* Multithreading in remote mode isn`t cheap,
209-
* establishing ssh connection can take 100-200ms, so running and terminating
210-
* one thread using generic multithread approach can take
211-
* almost as much time as copying itself.
212-
* TODO: maybe we should be more conservative and force single thread
213-
* push if batch_files array is small.
214-
*/
215-
if (num_threads == 1 || (parray_num(batch_files) == 1))
241+
INSTR_TIME_SET_CURRENT(start_time);
242+
for (i = 0; i < parray_num(batch_files); i++)
216243
{
217-
INSTR_TIME_SET_CURRENT(start_time);
218-
for (i = 0; i < parray_num(batch_files); i++)
219-
{
220-
int rc;
221-
WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i);
222-
223-
rc = push_file(xlogfile, archive_status_dir,
224-
pg_xlog_dir, instance->arclog_path,
225-
overwrite, no_sync,
226-
instance->archive_timeout,
227-
no_ready_rename || (strcmp(xlogfile->name, wal_file_name) == 0) ? true : false,
228-
is_compress && IsXLogFileName(xlogfile->name) ? true : false,
229-
instance->compress_level);
230-
if (rc == 0)
231-
n_total_pushed++;
232-
else
233-
n_total_skipped++;
234-
}
244+
int rc;
245+
WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i);
235246

236-
push_isok = true;
237-
goto push_done;
247+
rc = push_file(xlogfile, archive_status_dir,
248+
pg_xlog_dir, instance->arclog_path,
249+
overwrite, no_sync,
250+
instance->archive_timeout,
251+
no_ready_rename || (strcmp(xlogfile->name, wal_file_name) == 0) ? true : false,
252+
is_compress && IsXLogFileName(xlogfile->name) ? true : false,
253+
instance->compress_level);
254+
if (rc == 0)
255+
n_total_pushed++;
256+
else
257+
n_total_skipped++;
238258
}
239259

240-
/* init thread args with its own segno */
241-
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
242-
threads_args = (archive_push_arg *) palloc(sizeof(archive_push_arg) * num_threads);
243-
244-
for (i = 0; i < num_threads; i++)
245-
{
246-
archive_push_arg *arg = &(threads_args[i]);
247-
248-
arg->first_filename = wal_file_name;
249-
arg->archive_dir = instance->arclog_path;
250-
arg->pg_xlog_dir = pg_xlog_dir;
251-
arg->archive_status_dir = archive_status_dir;
252-
arg->overwrite = overwrite;
253-
arg->compress = is_compress;
254-
arg->no_sync = no_sync;
255-
arg->no_ready_rename = no_ready_rename;
256-
arg->archive_timeout = instance->archive_timeout;
260+
push_isok = true;
261+
goto push_done;
262+
}
257263

258-
arg->compress_alg = instance->compress_alg;
259-
arg->compress_level = instance->compress_level;
264+
/* init thread args with its own segno */
265+
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
266+
threads_args = (archive_push_arg *) palloc(sizeof(archive_push_arg) * num_threads);
260267

261-
arg->files = batch_files;
262-
arg->n_pushed = 0;
263-
arg->n_skipped = 0;
268+
for (i = 0; i < num_threads; i++)
269+
{
270+
archive_push_arg *arg = &(threads_args[i]);
271+
272+
arg->first_filename = wal_file_name;
273+
arg->archive_dir = instance->arclog_path;
274+
arg->pg_xlog_dir = pg_xlog_dir;
275+
arg->archive_status_dir = archive_status_dir;
276+
arg->overwrite = overwrite;
277+
arg->compress = is_compress;
278+
arg->no_sync = no_sync;
279+
arg->no_ready_rename = no_ready_rename;
280+
arg->archive_timeout = instance->archive_timeout;
281+
282+
arg->compress_alg = instance->compress_alg;
283+
arg->compress_level = instance->compress_level;
284+
285+
arg->files = batch_files;
286+
arg->n_pushed = 0;
287+
arg->n_skipped = 0;
288+
289+
arg->thread_num = i+1;
290+
/* By default there are some error */
291+
arg->ret = 1;
292+
}
264293

265-
arg->thread_num = i+1;
266-
/* By default there are some error */
267-
arg->ret = 1;
268-
}
294+
/* Run threads */
295+
INSTR_TIME_SET_CURRENT(start_time);
296+
for (i = 0; i < num_threads; i++)
297+
{
298+
archive_push_arg *arg = &(threads_args[i]);
299+
pthread_create(&threads[i], NULL, push_files, arg);
300+
}
269301

270-
/* Run threads */
271-
INSTR_TIME_SET_CURRENT(start_time);
272-
for (i = 0; i < num_threads; i++)
302+
/* Wait threads */
303+
for (i = 0; i < num_threads; i++)
304+
{
305+
pthread_join(threads[i], NULL);
306+
if (threads_args[i].ret == 1)
273307
{
274-
archive_push_arg *arg = &(threads_args[i]);
275-
pthread_create(&threads[i], NULL, push_files, arg);
308+
push_isok = false;
309+
n_total_failed++;
276310
}
277311

278-
/* Wait threads */
279-
for (i = 0; i < num_threads; i++)
280-
{
281-
pthread_join(threads[i], NULL);
282-
if (threads_args[i].ret == 1)
283-
{
284-
push_isok = false;
285-
n_total_failed++;
286-
}
287-
288-
n_total_pushed += threads_args[i].n_pushed;
289-
n_total_skipped += threads_args[i].n_skipped;
290-
}
312+
n_total_pushed += threads_args[i].n_pushed;
313+
n_total_skipped += threads_args[i].n_skipped;
314+
}
291315

292-
/* Note, that we are leaking memory here,
293-
* because pushing into archive is a very
294-
* time-sensetive operation, so we skip freeing stuff.
295-
*/
316+
/* Note, that we are leaking memory here,
317+
* because pushing into archive is a very
318+
* time-sensetive operation, so we skip freeing stuff.
319+
*/
296320

297321
push_done:
298-
fio_disconnect();
299-
/* calculate elapsed time */
300-
INSTR_TIME_SET_CURRENT(end_time);
301-
INSTR_TIME_SUBTRACT(end_time, start_time);
302-
push_time = INSTR_TIME_GET_DOUBLE(end_time);
303-
pretty_time_interval(push_time, pretty_time_str, 20);
304-
305-
if (push_isok)
306-
/* report number of files pushed into archive */
307-
elog(INFO, "pg_probackup archive-push completed successfully, "
308-
"pushed: %u, skipped: %u, time elapsed: %s",
309-
n_total_pushed, n_total_skipped, pretty_time_str);
310-
else
311-
elog(ERROR, "pg_probackup archive-push failed, "
312-
"pushed: %i, skipped: %u, failed: %u, time elapsed: %s",
313-
n_total_pushed, n_total_skipped, n_total_failed,
314-
pretty_time_str);
315-
}
322+
fio_disconnect();
323+
/* calculate elapsed time */
324+
INSTR_TIME_SET_CURRENT(end_time);
325+
INSTR_TIME_SUBTRACT(end_time, start_time);
326+
push_time = INSTR_TIME_GET_DOUBLE(end_time);
327+
pretty_time_interval(push_time, pretty_time_str, 20);
328+
329+
if (push_isok)
330+
/* report number of files pushed into archive */
331+
elog(INFO, "pg_probackup archive-push completed successfully, "
332+
"pushed: %u, skipped: %u, time elapsed: %s",
333+
n_total_pushed, n_total_skipped, pretty_time_str);
316334
else
317-
{
318-
/* If user passed a directory different from saved in pg_probackup.conf,
319-
* work according to the scheme with a single copy of files
320-
*/
321-
char backup_wal_file_path[MAXPGPATH];
322-
char absolute_wal_file_path[MAXPGPATH];
323-
324-
/* Check that -j option is not set with --wal-file-path option and ignore it*/
325-
if (num_threads > 1)
326-
elog(ERROR, "Option -j is not working with user defined --wal-file-path. Ignore");
327-
328-
join_path_components(absolute_wal_file_path, current_dir, wal_file_path);
329-
join_path_components(backup_wal_file_path, instance->arclog_path, wal_file_name);
330-
331-
elog(INFO, "pg_probackup archive-push from %s to %s", absolute_wal_file_path, backup_wal_file_path);
332-
333-
if (instance->compress_alg == PGLZ_COMPRESS)
334-
elog(ERROR, "pglz compression is not supported");
335-
336-
#ifdef HAVE_LIBZ
337-
if (instance->compress_alg == ZLIB_COMPRESS)
338-
is_compress = IsXLogFileName(wal_file_name);
339-
#endif
340-
341-
push_wal_file(absolute_wal_file_path, backup_wal_file_path, is_compress,
342-
overwrite, instance->compress_level);
343-
elog(INFO, "pg_probackup archive-push completed successfully");
344-
}
345-
335+
elog(ERROR, "pg_probackup archive-push failed, "
336+
"pushed: %i, skipped: %u, failed: %u, time elapsed: %s",
337+
n_total_pushed, n_total_skipped, n_total_failed,
338+
pretty_time_str);
346339
}
347340

348341
/* ------------- INTERNAL FUNCTIONS ---------- */
@@ -1178,7 +1171,6 @@ get_gz_error(gzFile gzf, int errnum)
11781171
}
11791172
#endif
11801173

1181-
11821174
/*
11831175
* compare CRC of two WAL files.
11841176
* If necessary, decompress WAL file from path2

0 commit comments

Comments
 (0)