Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c10b328

Browse files
committedOct 15, 2021
Used an existing code fragment copying one file into one stream instead of new code
1 parent bf0936c commit c10b328

File tree

1 file changed

+52
-364
lines changed

1 file changed

+52
-364
lines changed
 

Diff for: ‎src/archive.c

+52-364
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ static int push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_
2121
const char *archive_dir, bool overwrite, bool no_sync,
2222
int compress_level, uint32 archive_timeout);
2323
#endif
24-
static void push_wal_file(const char *from_path, const char *to_path,
25-
bool is_compress, bool overwrite, int compress_level);
2624
static void *push_files(void *arg);
2725
static void *get_files(void *arg);
2826
static bool get_wal_file(const char *filename, const char *from_path, const char *to_path,
@@ -32,12 +30,10 @@ static int get_wal_file_internal(const char *from_path, const char *to_path, FIL
3230
#ifdef HAVE_LIBZ
3331
static const char *get_gz_error(gzFile gzf, int errnum);
3432
#endif
35-
static bool fileEqualCRC(const char *path1, const char *path2,
36-
bool path2_is_compressed);
37-
static void copy_file_attributes(const char *from_path,
38-
fio_location from_location,
39-
const char *to_path, fio_location to_location,
40-
bool unlink_on_error);
33+
// static void copy_file_attributes(const char *from_path,
34+
// fio_location from_location,
35+
// const char *to_path, fio_location to_location,
36+
// bool unlink_on_error);
4137

4238
static bool next_wal_segment_exists(TimeLineID tli, XLogSegNo segno, const char *prefetch_dir, uint32 wal_seg_size);
4339
static uint32 run_wal_prefetch(const char *prefetch_dir, const char *archive_dir, TimeLineID tli,
@@ -152,6 +148,11 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
152148

153149
join_path_components(xlog_wal_path, PG_XLOG_DIR, wal_file_name);
154150

151+
#ifdef HAVE_LIBZ
152+
if (instance->compress_alg == ZLIB_COMPRESS)
153+
is_compress = true;
154+
#endif
155+
155156
if (wal_file_path == NULL)
156157
{
157158
elog(INFO, "Required parameter is not specified: --wal_file_path %%p "
@@ -161,59 +162,35 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
161162

162163
if (strcmp(wal_file_path, xlog_wal_path)!=0)
163164
{
164-
char backup_wal_file_path[MAXPGPATH];
165-
char absolute_wal_file_path[MAXPGPATH];
166-
167-
join_path_components(absolute_wal_file_path, wal_file_path, wal_file_name);
168-
join_path_components(backup_wal_file_path, instance->arclog_path, wal_file_name);
169-
170165
elog(INFO, "wal_file_path is setted by user %s", wal_file_path);
171166

172-
if ((batch_size > 1)||(num_threads > 1))
173-
elog(WARNING, "Options -j and --batch-size are ignored "
174-
"with --wal-file-path setted by user");
175-
176-
if (instance->compress_alg == PGLZ_COMPRESS)
177-
elog(ERROR, "pglz compression is not supported");
178-
179-
#ifdef HAVE_LIBZ
180-
if (instance->compress_alg == ZLIB_COMPRESS)
181-
is_compress = IsXLogFileName(wal_file_name);
182-
#endif
183-
184-
push_wal_file(absolute_wal_file_path, backup_wal_file_path, is_compress,
185-
overwrite, instance->compress_level);
186-
elog(INFO, "pg_probackup archive-push completed successfully");
187-
188-
return;
167+
join_path_components(pg_xlog_dir, instance->pgdata, XLOGDIR);
189168
}
169+
else
170+
{
171+
/* Create 'archlog_path' directory. Do nothing if it already exists. */
172+
//fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST);
190173

191-
/* Create 'archlog_path' directory. Do nothing if it already exists. */
192-
//fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST);
193-
194-
#ifdef HAVE_LIBZ
195-
if (instance->compress_alg == ZLIB_COMPRESS)
196-
is_compress = true;
197-
#endif
174+
if (!getcwd(current_dir, sizeof(current_dir)))
175+
elog(ERROR, "getcwd() error");
198176

199-
if (!getcwd(current_dir, sizeof(current_dir)))
200-
elog(ERROR, "getcwd() error");
177+
/* verify that archive-push --instance parameter is valid */
178+
system_id = get_system_identifier(current_dir);
201179

202-
/* verify that archive-push --instance parameter is valid */
203-
system_id = get_system_identifier(current_dir);
180+
if (instance->pgdata == NULL)
181+
elog(ERROR, "Cannot read pg_probackup.conf for this instance");
204182

205-
if (instance->pgdata == NULL)
206-
elog(ERROR, "Cannot read pg_probackup.conf for this instance");
183+
if (system_id != instance->system_identifier)
184+
elog(ERROR, "Refuse to push WAL segment %s into archive. Instance parameters mismatch."
185+
"Instance '%s' should have SYSTEM_ID = " UINT64_FORMAT " instead of " UINT64_FORMAT,
186+
wal_file_name, instance->name, instance->system_identifier, system_id);
207187

208-
if (system_id != instance->system_identifier)
209-
elog(ERROR, "Refuse to push WAL segment %s into archive. Instance parameters mismatch."
210-
"Instance '%s' should have SYSTEM_ID = " UINT64_FORMAT " instead of " UINT64_FORMAT,
211-
wal_file_name, instance->name, instance->system_identifier, system_id);
188+
if (instance->compress_alg == PGLZ_COMPRESS)
189+
elog(ERROR, "Cannot use pglz for WAL compression");
212190

213-
if (instance->compress_alg == PGLZ_COMPRESS)
214-
elog(ERROR, "Cannot use pglz for WAL compression");
191+
join_path_components(pg_xlog_dir, current_dir, XLOGDIR);
192+
}
215193

216-
join_path_components(pg_xlog_dir, current_dir, XLOGDIR);
217194
join_path_components(archive_status_dir, pg_xlog_dir, "archive_status");
218195

219196
/* Setup filelist and locks */
@@ -932,237 +909,6 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
932909
}
933910
#endif
934911

935-
/*
936-
* Copy WAL segment from pgdata to archive catalog with possible compression.
937-
*/
938-
void
939-
push_wal_file(const char *from_path, const char *to_path, bool is_compress,
940-
bool overwrite, int compress_level)
941-
{
942-
FILE *in = NULL;
943-
int out = -1;
944-
char buf[XLOG_BLCKSZ];
945-
const char *to_path_p;
946-
char to_path_temp[MAXPGPATH];
947-
int errno_temp;
948-
/* partial handling */
949-
struct stat st;
950-
int partial_try_count = 0;
951-
int partial_file_size = 0;
952-
bool partial_file_exists = false;
953-
954-
#ifdef HAVE_LIBZ
955-
char gz_to_path[MAXPGPATH];
956-
gzFile gz_out = NULL;
957-
958-
if (is_compress)
959-
{
960-
snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path);
961-
to_path_p = gz_to_path;
962-
}
963-
else
964-
#endif
965-
to_path_p = to_path;
966-
967-
// elog(INFO, "from_path %s", from_path);
968-
// Assert(0);
969-
970-
/* open file for read */
971-
in = fio_fopen(from_path, PG_BINARY_R, FIO_DB_HOST);
972-
if (in == NULL)
973-
elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path,
974-
strerror(errno));
975-
976-
/* Check if possible to skip copying */
977-
if (fileExists(to_path_p, FIO_BACKUP_HOST))
978-
{
979-
if (fileEqualCRC(from_path, to_path_p, is_compress))
980-
return;
981-
/* Do not copy and do not rise error. Just quit as normal. */
982-
else if (!overwrite)
983-
elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p);
984-
}
985-
986-
/* open backup file for write */
987-
#ifdef HAVE_LIBZ
988-
if (is_compress)
989-
{
990-
snprintf(to_path_temp, sizeof(to_path_temp), "%s.part", gz_to_path);
991-
992-
gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, compress_level, FIO_BACKUP_HOST);
993-
if (gz_out == NULL)
994-
{
995-
partial_file_exists = true;
996-
elog(WARNING, "Cannot open destination temporary WAL file \"%s\": %s",
997-
to_path_temp, strerror(errno));
998-
}
999-
}
1000-
else
1001-
#endif
1002-
{
1003-
snprintf(to_path_temp, sizeof(to_path_temp), "%s.part", to_path);
1004-
1005-
out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST);
1006-
if (out < 0)
1007-
{
1008-
partial_file_exists = true;
1009-
elog(WARNING, "Cannot open destination temporary WAL file \"%s\": %s",
1010-
to_path_temp, strerror(errno));
1011-
}
1012-
}
1013-
1014-
/* Partial file is already exists, it could have happened due to failed archive-push,
1015-
* in this case partial file can be discarded, or due to concurrent archiving.
1016-
*
1017-
* Our main goal here is to try to handle partial file to prevent stalling of
1018-
* continious archiving.
1019-
* To ensure that ecncountered partial file is actually a stale "orphaned" file,
1020-
* check its size every second.
1021-
* If the size has not changed in PARTIAL_WAL_TIMER seconds, we can consider
1022-
* the file stale and reuse it.
1023-
* If file size is changing, it means that another archiver works at the same
1024-
* directory with the same files. Such partial files cannot be reused.
1025-
*/
1026-
if (partial_file_exists)
1027-
{
1028-
while (partial_try_count < PARTIAL_WAL_TIMER)
1029-
{
1030-
1031-
if (fio_stat(to_path_temp, &st, false, FIO_BACKUP_HOST) < 0)
1032-
/* It is ok if partial is gone, we can safely error out */
1033-
elog(ERROR, "Cannot stat destination temporary WAL file \"%s\": %s", to_path_temp,
1034-
strerror(errno));
1035-
1036-
/* first round */
1037-
if (!partial_try_count)
1038-
partial_file_size = st.st_size;
1039-
1040-
/* file size is changing */
1041-
if (st.st_size > partial_file_size)
1042-
elog(ERROR, "Destination temporary WAL file \"%s\" is not stale", to_path_temp);
1043-
1044-
sleep(1);
1045-
partial_try_count++;
1046-
}
1047-
1048-
/* Partial segment is considered stale, so reuse it */
1049-
elog(WARNING, "Reusing stale destination temporary WAL file \"%s\"", to_path_temp);
1050-
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
1051-
1052-
#ifdef HAVE_LIBZ
1053-
if (is_compress)
1054-
{
1055-
gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, compress_level, FIO_BACKUP_HOST);
1056-
if (gz_out == NULL)
1057-
elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s",
1058-
to_path_temp, strerror(errno));
1059-
}
1060-
else
1061-
#endif
1062-
{
1063-
out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST);
1064-
if (out < 0)
1065-
elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s",
1066-
to_path_temp, strerror(errno));
1067-
}
1068-
}
1069-
1070-
/* copy content */
1071-
for (;;)
1072-
{
1073-
ssize_t read_len = 0;
1074-
1075-
read_len = fio_fread(in, buf, sizeof(buf));
1076-
1077-
// Assert(0);
1078-
1079-
if (read_len < 0)
1080-
{
1081-
errno_temp = errno;
1082-
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
1083-
elog(ERROR,
1084-
"Cannot read source WAL file \"%s\": %s",
1085-
from_path, strerror(errno_temp));
1086-
}
1087-
1088-
if (read_len > 0)
1089-
{
1090-
#ifdef HAVE_LIBZ
1091-
if (is_compress)
1092-
{
1093-
if (fio_gzwrite(gz_out, buf, read_len) != read_len)
1094-
{
1095-
errno_temp = errno;
1096-
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
1097-
elog(ERROR, "Cannot write to compressed WAL file \"%s\": %s",
1098-
to_path_temp, get_gz_error(gz_out, errno_temp));
1099-
}
1100-
}
1101-
else
1102-
#endif
1103-
{
1104-
if (fio_write(out, buf, read_len) != read_len)
1105-
{
1106-
errno_temp = errno;
1107-
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
1108-
elog(ERROR, "Cannot write to WAL file \"%s\": %s",
1109-
to_path_temp, strerror(errno_temp));
1110-
}
1111-
}
1112-
}
1113-
1114-
if (read_len == 0)
1115-
break;
1116-
}
1117-
1118-
#ifdef HAVE_LIBZ
1119-
if (is_compress)
1120-
{
1121-
if (fio_gzclose(gz_out) != 0)
1122-
{
1123-
errno_temp = errno;
1124-
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
1125-
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
1126-
to_path_temp, get_gz_error(gz_out, errno_temp));
1127-
}
1128-
}
1129-
else
1130-
#endif
1131-
{
1132-
if (fio_flush(out) != 0 || fio_close(out) != 0)
1133-
{
1134-
errno_temp = errno;
1135-
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
1136-
elog(ERROR, "Cannot write WAL file \"%s\": %s",
1137-
to_path_temp, strerror(errno_temp));
1138-
}
1139-
}
1140-
1141-
if (fio_fclose(in))
1142-
{
1143-
errno_temp = errno;
1144-
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
1145-
elog(ERROR, "Cannot close source WAL file \"%s\": %s",
1146-
from_path, strerror(errno_temp));
1147-
}
1148-
1149-
/* update file permission. */
1150-
copy_file_attributes(from_path, FIO_DB_HOST, to_path_temp, FIO_BACKUP_HOST, true);
1151-
1152-
if (fio_rename(to_path_temp, to_path_p, FIO_BACKUP_HOST) < 0)
1153-
{
1154-
errno_temp = errno;
1155-
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
1156-
elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s",
1157-
to_path_temp, to_path_p, strerror(errno_temp));
1158-
}
1159-
1160-
#ifdef HAVE_LIBZ
1161-
if (is_compress)
1162-
elog(INFO, "WAL file compressed to \"%s\"", gz_to_path);
1163-
#endif
1164-
}
1165-
1166912
#ifdef HAVE_LIBZ
1167913
/*
1168914
* Show error during work with compressed file
@@ -1181,88 +927,30 @@ get_gz_error(gzFile gzf, int errnum)
1181927
}
1182928
#endif
1183929

1184-
/*
1185-
* compare CRC of two WAL files.
1186-
* If necessary, decompress WAL file from path2
1187-
*/
1188-
static bool
1189-
fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed)
1190-
{
1191-
pg_crc32 crc1;
1192-
pg_crc32 crc2;
1193-
1194-
/* Get checksum of backup file */
1195-
#ifdef HAVE_LIBZ
1196-
if (path2_is_compressed)
1197-
{
1198-
char buf [1024];
1199-
gzFile gz_in = NULL;
1200-
1201-
INIT_FILE_CRC32(true, crc2);
1202-
gz_in = fio_gzopen(path2, PG_BINARY_R, Z_DEFAULT_COMPRESSION, FIO_BACKUP_HOST);
1203-
if (gz_in == NULL)
1204-
/* File cannot be read */
1205-
elog(ERROR,
1206-
"Cannot compare WAL file \"%s\" with compressed \"%s\"",
1207-
path1, path2);
1208-
1209-
for (;;)
1210-
{
1211-
int read_len = fio_gzread(gz_in, buf, sizeof(buf));
1212-
if (read_len <= 0 && !fio_gzeof(gz_in))
1213-
{
1214-
/* An error occurred while reading the file */
1215-
elog(WARNING,
1216-
"Cannot compare WAL file \"%s\" with compressed \"%s\": %d",
1217-
path1, path2, read_len);
1218-
return false;
1219-
}
1220-
COMP_FILE_CRC32(true, crc2, buf, read_len);
1221-
if (fio_gzeof(gz_in) || read_len == 0)
1222-
break;
1223-
}
1224-
FIN_FILE_CRC32(true, crc2);
1225-
1226-
if (fio_gzclose(gz_in) != 0)
1227-
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
1228-
path2, get_gz_error(gz_in, errno));
1229-
}
1230-
else
1231-
#endif
1232-
{
1233-
crc2 = fio_get_crc32(path2, FIO_BACKUP_HOST, true);
1234-
}
1235-
1236-
/* Get checksum of original file */
1237-
crc1 = fio_get_crc32(path1, FIO_DB_HOST, true);
1238-
1239-
return EQ_CRC32C(crc1, crc2);
1240-
}
1241-
1242-
/* Copy file attributes */
1243-
static void
1244-
copy_file_attributes(const char *from_path, fio_location from_location,
1245-
const char *to_path, fio_location to_location,
1246-
bool unlink_on_error)
1247-
{
1248-
struct stat st;
1249-
1250-
if (fio_stat(from_path, &st, true, from_location) == -1)
1251-
{
1252-
if (unlink_on_error)
1253-
fio_unlink(to_path, to_location);
1254-
elog(ERROR, "Cannot stat file \"%s\": %s",
1255-
from_path, strerror(errno));
1256-
}
1257-
1258-
if (fio_chmod(to_path, st.st_mode, to_location) == -1)
1259-
{
1260-
if (unlink_on_error)
1261-
fio_unlink(to_path, to_location);
1262-
elog(ERROR, "Cannot change mode of file \"%s\": %s",
1263-
to_path, strerror(errno));
1264-
}
1265-
}
930+
// /* Copy file attributes */
931+
// static void
932+
// copy_file_attributes(const char *from_path, fio_location from_location,
933+
// const char *to_path, fio_location to_location,
934+
// bool unlink_on_error)
935+
// {
936+
// struct stat st;
937+
938+
// if (fio_stat(from_path, &st, true, from_location) == -1)
939+
// {
940+
// if (unlink_on_error)
941+
// fio_unlink(to_path, to_location);
942+
// elog(ERROR, "Cannot stat file \"%s\": %s",
943+
// from_path, strerror(errno));
944+
// }
945+
946+
// if (fio_chmod(to_path, st.st_mode, to_location) == -1)
947+
// {
948+
// if (unlink_on_error)
949+
// fio_unlink(to_path, to_location);
950+
// elog(ERROR, "Cannot change mode of file \"%s\": %s",
951+
// to_path, strerror(errno));
952+
// }
953+
// }
1266954

1267955
/* Look for files with '.ready' suffix in archive_status directory
1268956
* and pack such files into batch sized array.

0 commit comments

Comments
 (0)
Please sign in to comment.