Skip to content

Commit 6173b21

Browse files
authored
Merge pull request #25 from postgrespro/PBCKP-278_cfs-ptrack
Pbckp 278 cfs ptrack
2 parents 936db26 + 2ae320a commit 6173b21

File tree

5 files changed

+264
-11
lines changed

5 files changed

+264
-11
lines changed

Diff for: engine.c

+76
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@
3636
#include "catalog/pg_tablespace.h"
3737
#include "miscadmin.h"
3838
#include "port/pg_crc32c.h"
39+
#ifdef PGPRO_EE
40+
/* For file_is_in_cfs_tablespace() only. */
41+
#include "common/cfs_common.h"
42+
#endif
3943
#include "storage/copydir.h"
4044
#if PG_VERSION_NUM >= 120000
4145
#include "storage/md.h"
@@ -91,6 +95,44 @@ ptrack_write_chunk(int fd, pg_crc32c *crc, char *chunk, size_t size)
9195
}
9296
}
9397

98+
/*
99+
* Determines whether given file path is a path to a cfm file.
100+
*/
101+
bool
102+
is_cfm_file_path(const char *filepath) {
103+
ssize_t len = strlen(filepath);
104+
105+
// For this length checks we assume that the filename is at least
106+
// 1 character longer than the corresponding extension ".cfm":
107+
// strlen(".cfm") == 4 therefore we assume that the filename can't be
108+
// shorter than 5 bytes, for example: "5.cfm".
109+
return strlen(filepath) >= 5 && strcmp(&filepath[len-4], ".cfm") == 0;
110+
}
111+
112+
#ifdef PGPRO_EE
113+
/*
114+
* Determines the relation file size specified by fullpath as if it
115+
* was not compressed.
116+
*/
117+
off_t
118+
get_cfs_relation_file_decompressed_size(RelFileNodeBackend rnode, const char *fullpath, ForkNumber forknum) {
119+
File fd;
120+
int compressor;
121+
off_t size;
122+
123+
compressor = md_get_compressor_internal(rnode.node, rnode.backend, forknum);
124+
fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY, compressor);
125+
126+
if(fd < 0)
127+
return (off_t)-1;
128+
129+
size = FileSize(fd);
130+
FileClose(fd);
131+
132+
return size;
133+
}
134+
#endif
135+
94136
/*
95137
* Delete ptrack files when ptrack is disabled.
96138
*
@@ -498,8 +540,13 @@ assign_ptrack_map_size(int newval, void *extra)
498540
* For use in functions that copy directories bypassing buffer manager.
499541
*/
500542
static void
543+
#ifdef PGPRO_EE
544+
ptrack_mark_file(Oid dbOid, Oid tablespaceOid,
545+
const char *filepath, const char *filename, bool is_cfs)
546+
#else
501547
ptrack_mark_file(Oid dbOid, Oid tablespaceOid,
502548
const char *filepath, const char *filename)
549+
#endif
503550
{
504551
RelFileNodeBackend rnode;
505552
ForkNumber forknum;
@@ -508,6 +555,9 @@ ptrack_mark_file(Oid dbOid, Oid tablespaceOid,
508555
struct stat stat_buf;
509556
int oidchars;
510557
char oidbuf[OIDCHARS + 1];
558+
#ifdef PGPRO_EE
559+
off_t rel_size;
560+
#endif
511561

512562
/* Do not track temporary relations */
513563
if (looks_like_temp_rel_name(filename))
@@ -526,6 +576,21 @@ ptrack_mark_file(Oid dbOid, Oid tablespaceOid,
526576
oidbuf[oidchars] = '\0';
527577
nodeRel(nodeOf(rnode)) = atooid(oidbuf);
528578

579+
#ifdef PGPRO_EE
580+
// if current tablespace is cfs-compressed and md_get_compressor_internal
581+
// returns the type of the compressing algorithm for filepath, then it
582+
// needs to be de-compressed to obtain its size
583+
if(is_cfs && md_get_compressor_internal(rnode.node, rnode.backend, forknum) != 0) {
584+
rel_size = get_cfs_relation_file_decompressed_size(rnode, filepath, forknum);
585+
586+
if(rel_size == (off_t)-1) {
587+
elog(WARNING, "ptrack: could not open cfs-compressed relation file: %s", filepath);
588+
return;
589+
}
590+
591+
nblocks = rel_size / BLCKSZ;
592+
} else
593+
#endif
529594
/* Compute number of blocks based on file size */
530595
if (stat(filepath, &stat_buf) == 0)
531596
nblocks = stat_buf.st_size / BLCKSZ;
@@ -546,6 +611,9 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid)
546611
{
547612
DIR *dir;
548613
struct dirent *de;
614+
#ifdef PGPRO_EE
615+
bool is_cfs;
616+
#endif
549617

550618
/* Do not walk during bootstrap and if ptrack is disabled */
551619
if (ptrack_map_size == 0
@@ -554,6 +622,10 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid)
554622
|| InitializingParallelWorker)
555623
return;
556624

625+
#ifdef PGPRO_EE
626+
is_cfs = file_is_in_cfs_tablespace(path);
627+
#endif
628+
557629
dir = AllocateDir(path);
558630

559631
while ((de = ReadDirExtended(dir, path, LOG)) != NULL)
@@ -581,7 +653,11 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid)
581653
}
582654

583655
if (S_ISREG(fst.st_mode))
656+
#ifdef PGPRO_EE
657+
ptrack_mark_file(dbOid, tablespaceOid, subpath, de->d_name, is_cfs);
658+
#else
584659
ptrack_mark_file(dbOid, tablespaceOid, subpath, de->d_name);
660+
#endif
585661
}
586662

587663
FreeDir(dir); /* we ignore any error here */

Diff for: engine.h

+6
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,10 @@ extern void ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid);
111111
extern void ptrack_mark_block(RelFileNodeBackend smgr_rnode,
112112
ForkNumber forkno, BlockNumber blkno);
113113

114+
extern bool is_cfm_file_path(const char *path);
115+
#ifdef PGPRO_EE
116+
extern off_t get_cfs_relation_file_decompressed_size(RelFileNodeBackend rnode,
117+
const char *fullpath, ForkNumber forknum);
118+
#endif
119+
114120
#endif /* PTRACK_ENGINE_H */

Diff for: ptrack.c

+31-11
Original file line numberDiff line numberDiff line change
@@ -251,14 +251,6 @@ ptrack_copydir_hook(const char *path)
251251

252252
elog(DEBUG1, "ptrack_copydir_hook: spcOid %u, dbOid %u", spcOid, dbOid);
253253

254-
#ifdef PGPRO_EE
255-
/*
256-
* Currently, we do not track files from compressed tablespaces in ptrack.
257-
*/
258-
if (file_is_in_cfs_tablespace(path))
259-
elog(DEBUG1, "ptrack_copydir_hook: skipping changes tracking in the CFS tablespace %u", spcOid);
260-
else
261-
#endif
262254
ptrack_walkdir(path, spcOid, dbOid);
263255

264256
if (prev_copydir_hook)
@@ -302,6 +294,11 @@ ptrack_gather_filelist(List **filelist, char *path, Oid spcOid, Oid dbOid)
302294
{
303295
DIR *dir;
304296
struct dirent *de;
297+
#ifdef PGPRO_EE
298+
bool is_cfs;
299+
300+
is_cfs = file_is_in_cfs_tablespace(path);
301+
#endif
305302

306303
dir = AllocateDir(path);
307304

@@ -315,7 +312,8 @@ ptrack_gather_filelist(List **filelist, char *path, Oid spcOid, Oid dbOid)
315312

316313
if (strcmp(de->d_name, ".") == 0 ||
317314
strcmp(de->d_name, "..") == 0 ||
318-
looks_like_temp_rel_name(de->d_name))
315+
looks_like_temp_rel_name(de->d_name) ||
316+
is_cfm_file_path(de->d_name))
319317
continue;
320318

321319
snprintf(subpath, sizeof(subpath), "%s/%s", path, de->d_name);
@@ -362,6 +360,10 @@ ptrack_gather_filelist(List **filelist, char *path, Oid spcOid, Oid dbOid)
362360
nodeSpc(pfl->relnode) = spcOid == InvalidOid ? DEFAULTTABLESPACE_OID : spcOid;
363361
pfl->path = GetRelationPath(dbOid, nodeSpc(pfl->relnode),
364362
nodeRel(pfl->relnode), InvalidBackendId, pfl->forknum);
363+
#ifdef PGPRO_EE
364+
pfl->is_cfs_compressed = is_cfs
365+
&& md_get_compressor_internal(pfl->relnode, InvalidBackendId, pfl->forknum) != 0;
366+
#endif
365367

366368
*filelist = lappend(*filelist, pfl);
367369

@@ -403,6 +405,10 @@ ptrack_filelist_getnext(PtScanCtx * ctx)
403405
ListCell *cell;
404406
char *fullpath;
405407
struct stat fst;
408+
off_t rel_st_size = 0;
409+
#ifdef PGPRO_EE
410+
RelFileNodeBackend rnodebackend;
411+
#endif
406412

407413
/* No more file in the list */
408414
if (list_length(ctx->filelist) == 0)
@@ -449,14 +455,28 @@ ptrack_filelist_getnext(PtScanCtx * ctx)
449455
return ptrack_filelist_getnext(ctx);
450456
}
451457

458+
#ifdef PGPRO_EE
459+
rnodebackend.node = ctx->bid.relnode;
460+
rnodebackend.backend = InvalidBackendId;
461+
462+
if(pfl->is_cfs_compressed) {
463+
rel_st_size = get_cfs_relation_file_decompressed_size(rnodebackend, fullpath, pfl->forknum);
464+
465+
// Could not open fullpath for some reason, trying the next file.
466+
if(rel_st_size == -1)
467+
return ptrack_filelist_getnext(ctx);
468+
} else
469+
#endif
470+
rel_st_size = fst.st_size;
471+
452472
if (pfl->segno > 0)
453473
{
454-
ctx->relsize = pfl->segno * RELSEG_SIZE + fst.st_size / BLCKSZ;
474+
ctx->relsize = pfl->segno * RELSEG_SIZE + rel_st_size / BLCKSZ;
455475
ctx->bid.blocknum = pfl->segno * RELSEG_SIZE;
456476
}
457477
else
458478
/* Estimate relsize as size of first segment in blocks */
459-
ctx->relsize = fst.st_size / BLCKSZ;
479+
ctx->relsize = rel_st_size / BLCKSZ;
460480

461481
elog(DEBUG3, "ptrack: got file %s with size %u from the file list", pfl->path, ctx->relsize);
462482

Diff for: ptrack.h

+3
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ typedef struct PtrackFileList_i
7878
ForkNumber forknum;
7979
int segno;
8080
char *path;
81+
#ifdef PGPRO_EE
82+
bool is_cfs_compressed;
83+
#endif
8184
} PtrackFileList_i;
8285

8386
#endif /* PTRACK_H */

Diff for: t/002_cfs_compatibility.pl

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use strict;
2+
use warnings;
3+
use Test::More;
4+
5+
my $pg_15_modules;
6+
7+
BEGIN
8+
{
9+
$pg_15_modules = eval
10+
{
11+
require PostgreSQL::Test::Cluster;
12+
require PostgreSQL::Test::Utils;
13+
return 1;
14+
};
15+
16+
unless (defined $pg_15_modules)
17+
{
18+
$pg_15_modules = 0;
19+
20+
require PostgresNode;
21+
require TestLib;
22+
}
23+
}
24+
25+
note('PostgreSQL 15 modules are used: ' . ($pg_15_modules ? 'yes' : 'no'));
26+
27+
my $node;
28+
my $res_stdout;
29+
my $res_stderr;
30+
31+
# Create node.
32+
# Older versions of PostgreSQL modules use get_new_node function.
33+
# Newer use standard perl object constructor syntax.
34+
eval
35+
{
36+
if ($pg_15_modules)
37+
{
38+
$node = PostgreSQL::Test::Cluster->new("node");
39+
}
40+
else
41+
{
42+
$node = PostgresNode::get_new_node("node");
43+
}
44+
};
45+
46+
note "Test for handling a ptrack map in compressed relations";
47+
48+
my $psql_stdout;
49+
50+
# Starting the node
51+
$node->init;
52+
53+
# Could not load ptrack module after postmaster start
54+
55+
my $cfs_tblspc1 = $node->basedir."/cfs_tblspc1";
56+
my $cfs_tblspc2 = $node->basedir."/cfs_tblspc2";
57+
mkdir $cfs_tblspc1 or die;
58+
mkdir $cfs_tblspc2 or die;
59+
my $no_cfs_tblspc1 = $node->basedir."/no_cfs_tblspc1";
60+
my $no_cfs_tblspc2 = $node->basedir."/no_cfs_tblspc2";
61+
mkdir $no_cfs_tblspc1 or die;
62+
mkdir $no_cfs_tblspc2 or die;
63+
64+
$node->append_conf('postgresql.conf', qq{
65+
shared_preload_libraries = 'ptrack'
66+
ptrack.map_size = 16
67+
wal_level = 'replica'
68+
});
69+
70+
$node->start;
71+
72+
# check cfs availability first
73+
my $cfs_available = $node->safe_psql('postgres',
74+
"select count(oid) from pg_proc where proname = 'cfs_version'");
75+
76+
if($cfs_available eq "0") {
77+
$node->stop;
78+
plan skip_all => "CFS is not supported by this PostgreSQL build";
79+
} else {
80+
plan tests => 2;
81+
}
82+
83+
# Creating content
84+
$node->safe_psql('postgres', qq|
85+
create tablespace cfs_tblspc1 location '$cfs_tblspc1' with (compression=true);
86+
create tablespace cfs_tblspc2 location '$cfs_tblspc2' with (compression=true);
87+
create tablespace no_cfs_tblspc1 location '$no_cfs_tblspc1';
88+
create tablespace no_cfs_tblspc2 location '$no_cfs_tblspc2';
89+
90+
create database testing_cfs tablespace cfs_tblspc1;
91+
create database testing_no_cfs tablespace no_cfs_tblspc1;
92+
|);
93+
94+
$node->safe_psql('testing_cfs', qq{
95+
create table testing(i int, text varchar);
96+
insert into testing select 1, '1111111111111111111111111' from generate_series(1,10000000);
97+
});
98+
99+
$node->safe_psql('testing_no_cfs', qq{
100+
create table testing_no(i int, text varchar);
101+
insert into testing_no select 1, '1111111111111111111111111' from generate_series(1,10000000);
102+
});
103+
104+
# creating ptrack
105+
$node->safe_psql('postgres', "create extension ptrack");
106+
107+
# obtaining init lsn for further usage in ptrack_get_pagemapset
108+
my $init_lsn = $node->safe_psql('postgres', 'select ptrack_init_lsn()');
109+
110+
# forcing copydir() hook by altering dbs tablespaces
111+
$node->safe_psql('postgres', "alter database testing_cfs set tablespace cfs_tblspc2;");
112+
$node->safe_psql('postgres', "alter database testing_no_cfs set tablespace no_cfs_tblspc2;");
113+
114+
# obtaining relpath for cfs table
115+
my $cfs_relpath = $node->safe_psql('testing_cfs', "select pg_relation_filepath('testing');");
116+
117+
# obtaining relpath for no-cfs table
118+
my $no_cfs_relpath = $node->safe_psql('testing_no_cfs', "select pg_relation_filepath('testing_no');");
119+
120+
# select the pagecount sums and compare them (should be equal)
121+
my $pagecount_sum_cfs = $node->safe_psql('postgres',
122+
"select sum(pagecount) from ptrack_get_pagemapset('$init_lsn'::pg_lsn) where path like '%$cfs_relpath%';");
123+
my $pagecount_sum_no_cfs = $node->safe_psql('postgres',
124+
"select sum(pagecount) from ptrack_get_pagemapset('$init_lsn'::pg_lsn) where path like '%$no_cfs_relpath%';");
125+
126+
is($pagecount_sum_cfs, $pagecount_sum_no_cfs, "pagecount sums don't match");
127+
128+
# forcing copydir() hook by altering dbs tablespaces back
129+
$node->safe_psql('postgres', "alter database testing_cfs set tablespace cfs_tblspc1;");
130+
$node->safe_psql('postgres', "alter database testing_no_cfs set tablespace no_cfs_tblspc1;");
131+
132+
# obtaining new relpath for cfs table
133+
$cfs_relpath = $node->safe_psql('testing_cfs', "select pg_relation_filepath('testing');");
134+
135+
# obtaining new relpath for no-cfs table
136+
$no_cfs_relpath = $node->safe_psql('testing_no_cfs', "select pg_relation_filepath('testing_no');");
137+
138+
# select the pagecount sums and compare them (again, they should be equal)
139+
$pagecount_sum_cfs = $node->safe_psql('postgres',
140+
"select sum(pagecount) from ptrack_get_pagemapset('$init_lsn'::pg_lsn) where path like '%$cfs_relpath%';");
141+
$pagecount_sum_no_cfs = $node->safe_psql('postgres',
142+
"select sum(pagecount) from ptrack_get_pagemapset('$init_lsn'::pg_lsn) where path like '%$no_cfs_relpath%';");
143+
144+
is($pagecount_sum_cfs, $pagecount_sum_no_cfs, "pagecount sums don't match");
145+
146+
147+
$node->stop;
148+

0 commit comments

Comments
 (0)