diff --git a/test/integration/test_fsm.c b/test/integration/test_fsm.c index c088ffb21..54515935c 100644 --- a/test/integration/test_fsm.c +++ b/test/integration/test_fsm.c @@ -50,11 +50,16 @@ /* Use the client connected to the server with the given ID. */ #define SELECT(ID) f->client = test_server_client(&f->servers[ID - 1]) +static char* bools[] = { + "0", "1", NULL +}; + /* Make sure the snapshots scheduled by raft don't interfere with the snapshots * scheduled by the tests. */ static char *snapshot_threshold[] = {"8192", NULL}; static MunitParameterEnum snapshot_params[] = { {SNAPSHOT_THRESHOLD_PARAM, snapshot_threshold}, + { "disk_mode", bools }, {NULL, NULL}, }; @@ -93,10 +98,21 @@ TEST(fsm, snapshotFreshDb, setUp, tearDown, 0, snapshot_params) unsigned n_bufs = 0; int rv; + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + rv = fsm->snapshot(fsm, &bufs, &n_bufs); munit_assert_int(rv, ==, 0); munit_assert_uint(n_bufs, ==, 1); /* Snapshot header */ + if (disk_mode) { + rv = fsm->snapshot_async(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + } + rv = fsm->snapshot_finalize(fsm, &bufs, &n_bufs); munit_assert_int(rv, ==, 0); munit_assert_ptr_null(bufs); @@ -116,6 +132,12 @@ TEST(fsm, snapshotWrittenDb, setUp, tearDown, 0, snapshot_params) unsigned last_insert_id; unsigned rows_affected; + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + /* Add some data to database */ HANDSHAKE; OPEN; @@ -128,6 +150,11 @@ TEST(fsm, snapshotWrittenDb, setUp, tearDown, 0, snapshot_params) munit_assert_int(rv, ==, 0); munit_assert_uint(n_bufs, >, 1); + if (disk_mode) { + rv = fsm->snapshot_async(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + } + rv = fsm->snapshot_finalize(fsm, &bufs, &n_bufs); munit_assert_int(rv, ==, 0); munit_assert_ptr_null(bufs); @@ -147,6 +174,12 @@ TEST(fsm, snapshotHeapFaultSingleDB, setUp, tearDown, 0, snapshot_params) unsigned last_insert_id; unsigned rows_affected; + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + /* Add some data to database */ HANDSHAKE; OPEN; @@ -169,10 +202,61 @@ TEST(fsm, snapshotHeapFaultSingleDB, setUp, tearDown, 0, snapshot_params) rv = fsm->snapshot(fsm, &bufs, &n_bufs); munit_assert_int(rv, !=, 0); - test_heap_fault_config(3, 1); + /* disk_mode does fewer allocations */ + if (!disk_mode) { + test_heap_fault_config(3, 1); + rv = fsm->snapshot(fsm, &bufs, &n_bufs); + munit_assert_int(rv, !=, 0); + } + + return MUNIT_OK; +} + +/* Inject faults into the async stage of the snapshot process */ +TEST(fsm, snapshotHeapFaultSingleDBAsyncDisk, setUp, tearDown, 0, snapshot_params) +{ + struct fixture *f = data; + struct raft_fsm *fsm = &f->servers[0].dqlite->raft_fsm; + struct raft_buffer *bufs; + unsigned n_bufs = 0; + int rv; + + unsigned stmt_id; + unsigned last_insert_id; + unsigned rows_affected; + + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + + if (!disk_mode) { + return MUNIT_SKIP; + } + + /* Add some data to database */ + HANDSHAKE; + OPEN; + PREPARE("CREATE TABLE test (n INT)", &stmt_id); + EXEC(stmt_id, &last_insert_id, &rows_affected); + PREPARE("INSERT INTO test(n) VALUES(1)", &stmt_id); + EXEC(stmt_id, &last_insert_id, &rows_affected); + + /* Sync stage succeeds */ rv = fsm->snapshot(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + + /* Inject heap fault in first call to encodeDiskDatabaseAsync */ + test_heap_fault_config(0, 1); + test_heap_fault_enable(); + rv = fsm->snapshot_async(fsm, &bufs, &n_bufs); munit_assert_int(rv, !=, 0); + /* Cleanup should succeed */ + rv = fsm->snapshot_finalize(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + return MUNIT_OK; } @@ -188,6 +272,12 @@ TEST(fsm, snapshotHeapFaultTwoDB, setUp, tearDown, 0, snapshot_params) unsigned last_insert_id; unsigned rows_affected; + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + /* Open 2 databases and add data to them */ HANDSHAKE; OPEN_NAME("test"); @@ -224,14 +314,87 @@ TEST(fsm, snapshotHeapFaultTwoDB, setUp, tearDown, 0, snapshot_params) rv = fsm->snapshot(fsm, &bufs, &n_bufs); munit_assert_int(rv, !=, 0); - test_heap_fault_config(4, 1); + /* disk_mode does fewer allocations */ + if (!disk_mode) { + test_heap_fault_config(4, 1); + rv = fsm->snapshot(fsm, &bufs, &n_bufs); + munit_assert_int(rv, !=, 0); + + test_heap_fault_config(5, 1); + rv = fsm->snapshot(fsm, &bufs, &n_bufs); + munit_assert_int(rv, !=, 0); + } + + return MUNIT_OK; +} + +TEST(fsm, snapshotHeapFaultTwoDBAsync, setUp, tearDown, 0, snapshot_params) +{ + struct fixture *f = data; + struct raft_fsm *fsm = &f->servers[0].dqlite->raft_fsm; + struct raft_buffer *bufs; + unsigned n_bufs = 0; + int rv; + + unsigned stmt_id; + unsigned last_insert_id; + unsigned rows_affected; + + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + + if (!disk_mode) { + return MUNIT_SKIP; + } + + /* Open 2 databases and add data to them */ + HANDSHAKE; + OPEN_NAME("test"); + PREPARE("CREATE TABLE test (n INT)", &stmt_id); + EXEC(stmt_id, &last_insert_id, &rows_affected); + PREPARE("INSERT INTO test(n) VALUES(1)", &stmt_id); + EXEC(stmt_id, &last_insert_id, &rows_affected); + + /* Close and reopen the client and open a second database */ + test_server_client_reconnect(&f->servers[0], &f->servers[0].client); + + HANDSHAKE; + OPEN_NAME("test2"); + PREPARE("CREATE TABLE test (n INT)", &stmt_id); + EXEC(stmt_id, &last_insert_id, &rows_affected); + PREPARE("INSERT INTO test(n) VALUES(1)", &stmt_id); + EXEC(stmt_id, &last_insert_id, &rows_affected); + + /* sync fsm__snapshot succeeds. */ rv = fsm->snapshot(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + + /* async step fails at different stages. */ + test_heap_fault_enable(); + + test_heap_fault_config(0, 1); + rv = fsm->snapshot_async(fsm, &bufs, &n_bufs); munit_assert_int(rv, !=, 0); - test_heap_fault_config(5, 1); + rv = fsm->snapshot_finalize(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + + /* Inject fault when encoding second Database */ + + /* sync fsm__snapshot succeeds. */ rv = fsm->snapshot(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + + test_heap_fault_config(1, 1); + rv = fsm->snapshot_async(fsm, &bufs, &n_bufs); munit_assert_int(rv, !=, 0); + rv = fsm->snapshot_finalize(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + return MUNIT_OK; } @@ -247,6 +410,12 @@ TEST(fsm, snapshotNewDbAddedBeforeFinalize, setUp, tearDown, 0, snapshot_params) unsigned last_insert_id; unsigned rows_affected; + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + /* Add some data to database */ HANDSHAKE; OPEN_NAME("test"); @@ -267,6 +436,12 @@ TEST(fsm, snapshotNewDbAddedBeforeFinalize, setUp, tearDown, 0, snapshot_params) OPEN_NAME("test2"); PREPARE("CREATE TABLE test (n INT)", &stmt_id); EXEC(stmt_id, &last_insert_id, &rows_affected); + + if (disk_mode) { + rv = fsm->snapshot_async(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + } + PREPARE("INSERT INTO test(n) VALUES(1)", &stmt_id); EXEC(stmt_id, &last_insert_id, &rows_affected); @@ -289,6 +464,12 @@ TEST(fsm, snapshotWritesBeforeFinalize, setUp, tearDown, 0, snapshot_params) char sql[128]; int rv; + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + /* Add some data to database */ HANDSHAKE; OPEN; @@ -303,9 +484,13 @@ TEST(fsm, snapshotWritesBeforeFinalize, setUp, tearDown, 0, snapshot_params) /* Add (a lot) more data to the database */ for (unsigned i = 0; i < 1000; ++i) { - sprintf(sql, "INSERT INTO test(n) VALUES(%d)", i + 1); - PREPARE(sql, &stmt_id); - EXEC(stmt_id, &last_insert_id, &rows_affected); + sprintf(sql, "INSERT INTO test(n) VALUES(%d)", i + 1); + PREPARE(sql, &stmt_id); + EXEC(stmt_id, &last_insert_id, &rows_affected); + if (disk_mode && i == 512) { + rv = fsm->snapshot_async(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + } } /* Finalize succeeds */ @@ -321,6 +506,60 @@ TEST(fsm, snapshotWritesBeforeFinalize, setUp, tearDown, 0, snapshot_params) return MUNIT_OK; } +TEST(fsm, concurrentSnapshots, setUp, tearDown, 0, snapshot_params) +{ + struct fixture *f = data; + struct raft_fsm *fsm = &f->servers[0].dqlite->raft_fsm; + struct raft_buffer *bufs; + struct raft_buffer *bufs2; + unsigned n_bufs = 0; + unsigned n_bufs2 = 0; + unsigned stmt_id; + unsigned last_insert_id; + unsigned rows_affected; + int rv; + + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + + /* Add some data to database */ + HANDSHAKE; + OPEN; + PREPARE("CREATE TABLE test (n INT)", &stmt_id); + EXEC(stmt_id, &last_insert_id, &rows_affected); + + /* Second snapshot fails when first isn't finalized */ + rv = fsm->snapshot(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + rv = fsm->snapshot(fsm, &bufs2, &n_bufs2); + munit_assert_int(rv, ==, RAFT_BUSY); + + if (disk_mode) { + rv = fsm->snapshot_async(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + } + + rv = fsm->snapshot_finalize(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + + /* Second snapshot succeeds after first is finalized */ + rv = fsm->snapshot(fsm, &bufs2, &n_bufs2); + munit_assert_int(rv, ==, 0); + if (disk_mode) { + rv = fsm->snapshot_async(fsm, &bufs2, &n_bufs2); + munit_assert_int(rv, ==, 0); + } + + rv = fsm->snapshot_finalize(fsm, &bufs2, &n_bufs2); + munit_assert_int(rv, ==, 0); + + return MUNIT_OK; +} + + /* Copies n raft buffers to a single raft buffer */ static struct raft_buffer n_bufs_to_buf(struct raft_buffer bufs[], unsigned n) { @@ -356,6 +595,7 @@ static char* num_records[] = { static MunitParameterEnum restore_params[] = { { "num_records", num_records }, { SNAPSHOT_THRESHOLD_PARAM, snapshot_threshold}, + { "disk_mode", bools }, { NULL, NULL }, }; @@ -374,6 +614,13 @@ TEST(fsm, snapshotRestore, setUp, tearDown, 0, restore_params) int rv; char sql[128]; + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + + /* Add some data to database */ HANDSHAKE; OPEN; @@ -388,6 +635,11 @@ TEST(fsm, snapshotRestore, setUp, tearDown, 0, restore_params) rv = fsm->snapshot(fsm, &bufs, &n_bufs); munit_assert_int(rv, ==, 0); + if (disk_mode) { + rv = fsm->snapshot_async(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + } + /* Deep copy snapshot */ snapshot = n_bufs_to_buf(bufs, n_bufs); @@ -417,42 +669,6 @@ TEST(fsm, snapshotRestore, setUp, tearDown, 0, restore_params) return MUNIT_OK; } -TEST(fsm, concurrentSnapshots, setUp, tearDown, 0, snapshot_params) -{ - struct fixture *f = data; - struct raft_fsm *fsm = &f->servers[0].dqlite->raft_fsm; - struct raft_buffer *bufs; - struct raft_buffer *bufs2; - unsigned n_bufs = 0; - unsigned n_bufs2 = 0; - unsigned stmt_id; - unsigned last_insert_id; - unsigned rows_affected; - int rv; - - /* Add some data to database */ - HANDSHAKE; - OPEN; - PREPARE("CREATE TABLE test (n INT)", &stmt_id); - EXEC(stmt_id, &last_insert_id, &rows_affected); - - /* Second snapshot fails when first isn't finalized */ - rv = fsm->snapshot(fsm, &bufs, &n_bufs); - munit_assert_int(rv, ==, 0); - rv = fsm->snapshot(fsm, &bufs2, &n_bufs2); - munit_assert_int(rv, ==, RAFT_BUSY); - rv = fsm->snapshot_finalize(fsm, &bufs, &n_bufs); - munit_assert_int(rv, ==, 0); - - /* Second snapshot succeeds after first is finalized */ - rv = fsm->snapshot(fsm, &bufs2, &n_bufs2); - munit_assert_int(rv, ==, 0); - rv = fsm->snapshot_finalize(fsm, &bufs2, &n_bufs2); - munit_assert_int(rv, ==, 0); - - return MUNIT_OK; -} - TEST(fsm, snapshotRestoreMultipleDBs, setUp, tearDown, 0, snapshot_params) { struct fixture *f = data; @@ -468,6 +684,12 @@ TEST(fsm, snapshotRestoreMultipleDBs, setUp, tearDown, 0, snapshot_params) const char *msg; int rv; + bool disk_mode = false; + const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); + if (disk_mode_param != NULL) { + disk_mode = (bool)atoi(disk_mode_param); + } + /* Create 2 databases and add data to them. */ HANDSHAKE; OPEN_NAME("test"); @@ -487,6 +709,12 @@ TEST(fsm, snapshotRestoreMultipleDBs, setUp, tearDown, 0, snapshot_params) /* Snapshot both databases and restore the data. */ rv = fsm->snapshot(fsm, &bufs, &n_bufs); munit_assert_int(rv, ==, 0); + + if (disk_mode) { + rv = fsm->snapshot_async(fsm, &bufs, &n_bufs); + munit_assert_int(rv, ==, 0); + } + /* Copy the snapshot to restore it */ snapshot = n_bufs_to_buf(bufs, n_bufs); rv = fsm->snapshot_finalize(fsm, &bufs, &n_bufs);