Skip to content

Commit fa170d7

Browse files
committed
(testing) prototype for async ptrack map file reading
1 parent 8b30346 commit fa170d7

File tree

3 files changed

+255
-18
lines changed

3 files changed

+255
-18
lines changed

.travis.yml

+16-16
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,26 @@ notifications:
2121

2222
# keep in sync with codecov.yml number of builds
2323
env:
24-
- PG_BRANCH=master TEST_CASE=tap
25-
- PG_BRANCH=master TEST_CASE=tap MODE=legacy
26-
# - PG_BRANCH=master TEST_CASE=all
27-
- PG_BRANCH=master TEST_CASE=all MODE=paranoia
28-
- PG_BRANCH=REL_14_STABLE TEST_CASE=tap
29-
- PG_BRANCH=REL_14_STABLE TEST_CASE=tap MODE=legacy
30-
# - PG_BRANCH=REL_14_STABLE TEST_CASE=all
31-
- PG_BRANCH=REL_14_STABLE TEST_CASE=all MODE=paranoia
24+
# - PG_BRANCH=master TEST_CASE=tap
25+
# - PG_BRANCH=master TEST_CASE=tap MODE=legacy
26+
## - PG_BRANCH=master TEST_CASE=all
27+
# - PG_BRANCH=master TEST_CASE=all MODE=paranoia
28+
# - PG_BRANCH=REL_14_STABLE TEST_CASE=tap
29+
# - PG_BRANCH=REL_14_STABLE TEST_CASE=tap MODE=legacy
30+
## - PG_BRANCH=REL_14_STABLE TEST_CASE=all
31+
# - PG_BRANCH=REL_14_STABLE TEST_CASE=all MODE=paranoia
3232
- PG_BRANCH=REL_13_STABLE TEST_CASE=tap
3333
- PG_BRANCH=REL_13_STABLE TEST_CASE=tap MODE=legacy
3434
# - PG_BRANCH=REL_13_STABLE TEST_CASE=all
3535
- PG_BRANCH=REL_13_STABLE TEST_CASE=all MODE=paranoia
36-
- PG_BRANCH=REL_12_STABLE TEST_CASE=tap
37-
- PG_BRANCH=REL_12_STABLE TEST_CASE=tap MODE=legacy
38-
# - PG_BRANCH=REL_12_STABLE TEST_CASE=all
39-
- PG_BRANCH=REL_12_STABLE TEST_CASE=all MODE=paranoia
40-
- PG_BRANCH=REL_11_STABLE TEST_CASE=tap
41-
- PG_BRANCH=REL_11_STABLE TEST_CASE=tap MODE=legacy
42-
# - PG_BRANCH=REL_11_STABLE TEST_CASE=all
43-
- PG_BRANCH=REL_11_STABLE TEST_CASE=all MODE=paranoia
36+
# - PG_BRANCH=REL_12_STABLE TEST_CASE=tap
37+
# - PG_BRANCH=REL_12_STABLE TEST_CASE=tap MODE=legacy
38+
## - PG_BRANCH=REL_12_STABLE TEST_CASE=all
39+
# - PG_BRANCH=REL_12_STABLE TEST_CASE=all MODE=paranoia
40+
# - PG_BRANCH=REL_11_STABLE TEST_CASE=tap
41+
# - PG_BRANCH=REL_11_STABLE TEST_CASE=tap MODE=legacy
42+
## - PG_BRANCH=REL_11_STABLE TEST_CASE=all
43+
# - PG_BRANCH=REL_11_STABLE TEST_CASE=all MODE=paranoia
4444

4545
jobs:
4646
allow_failures:

engine.c

+223-2
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,15 @@
2626
#include <sys/mman.h>
2727
#endif
2828

29+
#include <aio.h>
30+
2931
#include "access/htup_details.h"
3032
#include "access/parallel.h"
3133
#include "access/xlog.h"
3234
#include "catalog/pg_tablespace.h"
3335
#include "miscadmin.h"
3436
#include "port/pg_crc32c.h"
37+
#include "portability/instr_time.h"
3538
#include "storage/copydir.h"
3639
#if PG_VERSION_NUM >= 120000
3740
#include "storage/md.h"
@@ -116,10 +119,20 @@ ptrackCleanFiles(void)
116119
* This function is called only at startup,
117120
* so data is read directly (without synchronization).
118121
*/
122+
#ifndef PTRACK_USE_AIO
123+
#define ptrackMapReadFromFile ptrackMapReadFromFileSync
124+
#else
125+
#define ptrackMapReadFromFile ptrackMapReadFromFileAsync
126+
#endif
127+
128+
#ifndef PTRACK_USE_AIO
119129
static bool
120-
ptrackMapReadFromFile(const char *ptrack_path)
130+
ptrackMapReadFromFileSync(const char *ptrack_path)
121131
{
122-
elog(DEBUG1, "ptrack read map");
132+
instr_time func_start, func_end, crc_start, func_time, read_time, crc_time;
133+
134+
elog(LOG, "ptrack read map (sync version): start, ptrack_actual_size %zu bytes", PtrackActualSize);
135+
INSTR_TIME_SET_CURRENT(func_start);
123136

124137
/* Do actual file read */
125138
{
@@ -169,6 +182,8 @@ ptrackMapReadFromFile(const char *ptrack_path)
169182
close(ptrack_fd);
170183
}
171184

185+
INSTR_TIME_SET_CURRENT(crc_start);
186+
172187
/* Check PTRACK_MAGIC */
173188
if (strcmp(ptrack_map->magic, PTRACK_MAGIC) != 0)
174189
{
@@ -215,8 +230,214 @@ ptrackMapReadFromFile(const char *ptrack_path)
215230
}
216231
}
217232

233+
INSTR_TIME_SET_CURRENT(func_end);
234+
235+
INSTR_TIME_SET_ZERO(func_time);
236+
INSTR_TIME_ACCUM_DIFF(func_time, func_end, func_start);
237+
INSTR_TIME_SET_ZERO(read_time);
238+
INSTR_TIME_ACCUM_DIFF(read_time, crc_start, func_start);
239+
INSTR_TIME_SET_ZERO(crc_time);
240+
INSTR_TIME_ACCUM_DIFF(crc_time, func_end, crc_start);
241+
elog(LOG, "ptrack read map (sync version): end. Timings (microseconds): file io time = %lu, crc time = %lu, overall time = %lu",
242+
INSTR_TIME_GET_MICROSEC(read_time), INSTR_TIME_GET_MICROSEC(crc_time), INSTR_TIME_GET_MICROSEC(func_time));
218243
return true;
219244
}
245+
#endif
246+
247+
#ifdef PTRACK_USE_AIO
248+
static bool
249+
ptrackMapReadFromFileAsync(const char *ptrack_path)
250+
{
251+
instr_time func_start, func_end, func_time,
252+
aio_start, aio_end, aio_time,
253+
crc_start, crc_end, crc_time;
254+
int ptrack_fd;
255+
pg_crc32c crc;
256+
pg_crc32c *file_crc;
257+
char *read_ptr = (char *) ptrack_map;
258+
char *readed_ptr = (char *) ptrack_map;
259+
char *crc_ptr = (char *) ptrack_map;
260+
struct aiocb io_requests[PTRACK_AIO_READ_QUEUE_DEPTH];
261+
int io_requests_head = 0;
262+
int io_requests_tail = 0;
263+
int io_pending_requests = 0;
264+
size_t to_read;
265+
266+
elog(LOG, "ptrack read map (AIO version): start, ptrack_actual_size %zu bytes, aio_read_size %zu, aio_queue_size %i",
267+
PtrackActualSize, (size_t) PTRACK_AIO_READ_CHUNK, PTRACK_AIO_READ_QUEUE_DEPTH);
268+
INSTR_TIME_SET_ZERO(func_time);
269+
INSTR_TIME_SET_ZERO(aio_time);
270+
INSTR_TIME_SET_ZERO(crc_time);
271+
INSTR_TIME_SET_CURRENT(func_start);
272+
273+
INSTR_TIME_SET_CURRENT(aio_start);
274+
ptrack_fd = BasicOpenFile(ptrack_path, O_RDWR | PG_BINARY);
275+
if (ptrack_fd < 0)
276+
elog(ERROR, "ptrack read map: failed to open map file \"%s\": %m", ptrack_path);
277+
INSTR_TIME_SET_CURRENT(aio_end);
278+
INSTR_TIME_ACCUM_DIFF(aio_time, aio_end, aio_start);
279+
280+
INSTR_TIME_SET_CURRENT(crc_start);
281+
INIT_CRC32C(crc);
282+
file_crc = (pg_crc32c *) ((char *) ptrack_map + PtrackCrcOffset);
283+
INSTR_TIME_SET_CURRENT(crc_end);
284+
INSTR_TIME_ACCUM_DIFF(crc_time, crc_end, crc_start);
285+
286+
do
287+
{
288+
to_read = Min(PTRACK_AIO_READ_CHUNK, PtrackActualSize - (read_ptr - (char *) ptrack_map));
289+
290+
// Wait AIO read for completion
291+
// if there no more space in queue
292+
if (io_pending_requests == PTRACK_AIO_READ_QUEUE_DEPTH || to_read == 0)
293+
{
294+
int rc;
295+
const struct aiocb *wait_op = &(io_requests[io_requests_tail]);
296+
struct timespec timeout = { .tv_sec = 0, .tv_nsec = PTRACK_AIO_SUSPEND_TIMEOUT_NS };
297+
INSTR_TIME_SET_CURRENT(aio_start);
298+
// we always wait only one (most early) operation
299+
do
300+
{
301+
elog(DEBUG1, "ptrack read map: aio_suspend: io_requests_tail = %i, io_pending_requests = %i",
302+
io_requests_tail, io_pending_requests);
303+
rc = aio_suspend(&wait_op, 1, &timeout);
304+
if (rc != 0 && errno != EAGAIN && errno != EINTR)
305+
{
306+
elog(ERROR, "ptrack read map: failed to call aio_suspend: %m");
307+
}
308+
} while(rc != 0);
309+
INSTR_TIME_SET_CURRENT(aio_end);
310+
INSTR_TIME_ACCUM_DIFF(aio_time, aio_end, aio_start);
311+
}
312+
313+
// Try to consume AIO operation
314+
// and advance readed_ptr
315+
if (io_pending_requests > 0)
316+
{
317+
int rc;
318+
INSTR_TIME_SET_CURRENT(aio_start);
319+
320+
elog(DEBUG1, "ptrack read map: aio_error: io_requests_tail = %i, io_pending_requests = %i",
321+
io_requests_tail, io_pending_requests);
322+
rc = aio_error(&(io_requests[io_requests_tail]));
323+
if (rc == 0)
324+
{
325+
io_pending_requests--;
326+
rc = aio_return(&(io_requests[io_requests_tail]));
327+
if (rc > 0)
328+
{
329+
readed_ptr += rc;
330+
io_requests_tail = (io_requests_tail + 1) % PTRACK_AIO_READ_QUEUE_DEPTH;
331+
}
332+
else
333+
{
334+
elog(ERROR, "ptrack read map: failed to call aio_return: %m");
335+
}
336+
}
337+
else if (rc == EINPROGRESS)
338+
{
339+
elog(DEBUG1, "ptrack read map: aio_error: operation still in progress, io_requests_tail = %i, io_pending_requests = %i",
340+
io_requests_tail, io_pending_requests);
341+
}
342+
else
343+
{
344+
elog(ERROR, "ptrack read map: failed to call aio_error: %m");
345+
}
346+
INSTR_TIME_SET_CURRENT(aio_end);
347+
INSTR_TIME_ACCUM_DIFF(aio_time, aio_end, aio_start);
348+
}
349+
350+
// Put AIO request if needed and if there is room for it
351+
// and advance read_ptr
352+
if (io_pending_requests < PTRACK_AIO_READ_QUEUE_DEPTH && to_read > 0)
353+
{
354+
int rc;
355+
INSTR_TIME_SET_CURRENT(aio_start);
356+
memset(&(io_requests[io_requests_head]), 0, sizeof(io_requests[0]));
357+
io_requests[io_requests_head] = (struct aiocb) {
358+
.aio_fildes = ptrack_fd,
359+
.aio_offset = read_ptr - (char *) ptrack_map,
360+
.aio_buf = read_ptr,
361+
.aio_nbytes = to_read,
362+
.aio_reqprio = 0,
363+
.aio_sigevent = {
364+
.sigev_notify = SIGEV_NONE,
365+
// fill other fields, see sigevent(7)
366+
},
367+
.aio_lio_opcode = LIO_NOP,
368+
};
369+
elog(DEBUG1, "ptrack read map: aio_read: io_requests_head = %i, .aio_offset = %zu, .aio_nbytes = %zu, io_pending_requests = %i",
370+
io_requests_head, io_requests[io_requests_head].aio_offset, io_requests[io_requests_head].aio_nbytes, io_pending_requests + 1);
371+
rc = aio_read(&(io_requests[io_requests_head]));
372+
// check errors!
373+
read_ptr += to_read;
374+
io_pending_requests++;
375+
io_requests_head = (io_requests_head + 1) % PTRACK_AIO_READ_QUEUE_DEPTH;
376+
INSTR_TIME_SET_CURRENT(aio_end);
377+
INSTR_TIME_ACCUM_DIFF(aio_time, aio_end, aio_start);
378+
}
379+
380+
// Calculate CRC (and advance crc_ptr)
381+
if (crc_ptr < Min(readed_ptr, (char *) file_crc))
382+
{
383+
size_t crc_chunk_size = Min(readed_ptr, (char *) file_crc) - crc_ptr;
384+
INSTR_TIME_SET_CURRENT(crc_start);
385+
elog(DEBUG1, "ptrack read map: COMP_CRC32C: offset = %zu, nbytes = %zu",
386+
crc_ptr - (char *) ptrack_map, crc_chunk_size);
387+
COMP_CRC32C(crc, crc_ptr, crc_chunk_size);
388+
crc_ptr += crc_chunk_size;
389+
INSTR_TIME_SET_CURRENT(crc_end);
390+
INSTR_TIME_ACCUM_DIFF(crc_time, crc_end, crc_start);
391+
}
392+
} while (crc_ptr < (char *) file_crc || to_read > 0);
393+
394+
INSTR_TIME_SET_CURRENT(aio_start);
395+
close(ptrack_fd);
396+
INSTR_TIME_SET_CURRENT(aio_end);
397+
INSTR_TIME_ACCUM_DIFF(aio_time, aio_end, aio_start);
398+
399+
INSTR_TIME_SET_CURRENT(crc_start);
400+
/* Check PTRACK_MAGIC */
401+
if (strcmp(ptrack_map->magic, PTRACK_MAGIC) != 0)
402+
{
403+
elog(WARNING, "ptrack read map: wrong map format of file \"%s\"", ptrack_path);
404+
return false;
405+
}
406+
407+
/* Check ptrack version inside old ptrack map */
408+
if (ptrack_map->version_num != PTRACK_MAP_FILE_VERSION_NUM)
409+
{
410+
ereport(WARNING,
411+
(errcode(ERRCODE_DATA_CORRUPTED),
412+
errmsg("ptrack read map: map format version %d in the file \"%s\" is incompatible with file format of extension %d",
413+
ptrack_map->version_num, ptrack_path, PTRACK_MAP_FILE_VERSION_NUM),
414+
errdetail("Deleting file \"%s\" and reinitializing ptrack map.", ptrack_path)));
415+
return false;
416+
}
417+
418+
/* Check CRC */
419+
FIN_CRC32C(crc);
420+
elog(DEBUG1, "ptrack read map: crc %u, file_crc %u, init_lsn %X/%X",
421+
crc, *file_crc, (uint32) (ptrack_map->init_lsn.value >> 32), (uint32) ptrack_map->init_lsn.value);
422+
423+
if (!EQ_CRC32C(*file_crc, crc))
424+
{
425+
ereport(WARNING,
426+
(errcode(ERRCODE_DATA_CORRUPTED),
427+
errmsg("ptrack read map: incorrect checksum of file \"%s\"", ptrack_path),
428+
errdetail("Deleting file \"%s\" and reinitializing ptrack map.", ptrack_path)));
429+
return false;
430+
}
431+
INSTR_TIME_SET_CURRENT(crc_end);
432+
INSTR_TIME_ACCUM_DIFF(crc_time, crc_end, crc_start);
433+
434+
INSTR_TIME_SET_CURRENT(func_end);
435+
INSTR_TIME_ACCUM_DIFF(func_time, func_end, func_start);
436+
elog(LOG, "ptrack read map (AIO version): end. Timings (microseconds): file aio calls time = %lu, crc time = %lu, overall time = %lu",
437+
INSTR_TIME_GET_MICROSEC(aio_time), INSTR_TIME_GET_MICROSEC(crc_time), INSTR_TIME_GET_MICROSEC(func_time));
438+
return true;
439+
}
440+
#endif
220441

221442
/*
222443
* Read PTRACK_PATH file into already allocated shared memory, check header and checksum

engine.h

+16
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,22 @@ typedef PtrackMapHdr * PtrackMap;
8989
#define BID_HASH_FUNC(bid) \
9090
(DatumGetUInt64(hash_any_extended((unsigned char *)&bid, sizeof(bid), 0)))
9191

92+
/* async io section*/
93+
94+
/* use aio ptrack map read if defined */
95+
#define PTRACK_USE_AIO
96+
97+
/* size of one async read operation (bytes) */
98+
#define PTRACK_AIO_READ_CHUNK 1024*1024
99+
100+
/* maximum count of pending aio read operations */
101+
#define PTRACK_AIO_READ_QUEUE_DEPTH 4
102+
103+
/* aio_suspend timeout parameter value (in nanoseconds, ie 5E7 is 50 milliseconds. can't be more or equal to 1 second) */
104+
#define PTRACK_AIO_SUSPEND_TIMEOUT_NS 5E7
105+
106+
/* end of async io section */
107+
92108
/*
93109
* Per process pointer to shared ptrack_map
94110
*/

0 commit comments

Comments
 (0)