Skip to content

Project 6

Ingyu Bang edited this page Dec 27, 2021 · 1 revision

Project 6 Transaction Logging & Three-Pass Recovery

Latch Sequence

Before we implement the logging and recovery, we need to change the latch sequence in the update operation. Originally, we did not hold the page latch when acquiring the lock. But this can cause some problems. Therefore, we have following latch sequence:

  1. Acquire the Page Latch
  2. Acquire the Lock Manager Latch
  3. Try to acquire the lock (go back to the root of the tree and sleep if failed)
  4. Release the Lock Manager Latch
  5. Create the Log Record
  6. Acquire the Log Buffer Latch
  7. Update the Log Buffer
  8. Release the Log Buffer Latch
  9. Release the Page Latch

This way, few problems we were concerned of are solved.

Log Entry

The log entry is the record that is written to the log buffer. The log entry is defined as follows:

class log_entry_t{
public:
    char *data;
    log_entry_t();
    log_entry_t(int data_length, int compensate);
    log_entry_t(int size);
    ~log_entry_t();

    // getters
    // [CAUTION] does not check if data array is big enough
    int get_log_size() const;
    uint64_t get_lsn() const;
    uint64_t get_prev_lsn() const;
    int get_trx_id() const;
    int get_type() const;
    int64_t get_table_id() const;
    pagenum_t get_pagenum() const;
    uint16_t get_offset() const;
    uint16_t get_length() const;
    void get_old_image(char *dest) const; // caller allocates
    void get_new_image(char *dest) const; // caller allocates
    uint64_t get_next_undo_lsn() const;

    // setters
    void set_lsn(uint64_t lsn);
    void set_prev_lsn(uint64_t prev_lsn);
    void set_trx_id(int trx_id);
    void set_type(int type);
    void set_table_id(int64_t table_id);
    void set_pagenum(pagenum_t pagenum);
    void set_offset(uint16_t offset);
    void set_length(uint16_t length);
    void set_old_image(const char *src);
    void set_new_image(const char *src);
    void set_next_undo_lsn(uint64_t next_undo_lsn);
};

The three constructors are for different purposes. 1. The default constructor which makes the entry size of 28 bytes. 2. The constructor which makes the entry size of 48 + 2*data_length (+8 if compensate == 1). And finally 3. The constructor which makes the entry size of given size(for reading the log file onto memory).

I implemented the log entry in this way because if we use the naive way to struct it, the size of the log_entry will be different due to the compiler settings.

Next, we have five different funtions that will create the log_entry.

log_entry_t* create_begin_log(int trx_id);
log_entry_t* create_update_log(int trx_id, int64_t table_id, pagenum_t pagenum, uint16_t offset, uint16_t length, const char *old, const char *new_);
log_entry_t* create_commit_log(int trx_id);
log_entry_t* create_rollback_log(int trx_id);
log_entry_t* create_compensate_log(int trx_id, int64_t table_id, pagenum_t pagenum, uint16_t offset, uint16_t length, const char *old, const char *new_, uint64_t next_undo_lsn);

These functions are implemented in the log_entry.cc file. They are quite straightforward. The first one is for creating the begin log entry. The second one is for creating the update log entry. The third one is for creating the commit log entry. The fourth one is for creating the rollback log entry. The fifth one is for creating the compensate log entry.

After we create the log entry we need to add it to the buffer using add_to_log_buffer(). This function is protected by the log_buffer_mutex. The function will assign LSN to the log_entry and return its LSN to the caller. This return value can be used to update the page LSN or the Trx Table.

void log_write(log_entry_t* log) {
    int sz = log->get_log_size();
    for (int i = 0; i < sz; i++) {
        fprintf(log_file, "%c", log->data[i]);
    }
    // fwrite(const_cast<char*>(log->data), 1, log->get_log_size(), log_file);
}

void _log_flush() {
    for (int i = 0; i < log_buffer.size(); i++) {
        log_entry_t* log = log_buffer[i];
        log_write(log);
    }
    log_buffer.clear();
    fflush(log_file);
}

void log_flush() {
    pthread_mutex_lock(&log_buffer_mutex);
    _log_flush();
    pthread_mutex_unlock(&log_buffer_mutex);
}

Two functions are defined to write the buffer into the file. Note that log_flush() is protected by the log_buffer_mutex and calls unprotected function _log_flush(). The function _log_flush() is also called in the function add_to_log_buffer() when the buffer is full. It is implemented this way so that add_to_log_buffer() does not have to release the mutex before calling flush.

Thee-Pass Recovery

The code snippet below are not the same code in the real implementation. Some lines of code are removed to make the code shorter.

// Analysis Pass
std::set<int> winners;
std::map<int, uint64_t> losers;

fprintf(logmsg_file, "[ANALYSIS] Analysis pass start\n");
while (true) {
    int sz;
    if (fread(&sz, sizeof(int), 1, log_file) != 1) break;
    log_entry_t* log = new log_entry_t(sz);
    fseek(log_file, -sizeof(int), SEEK_CUR);
    fread(log->data, 1, sz, log_file);
    losers[log->get_trx_id()] = log->get_lsn();
    if (log->get_type() == LOG_COMMIT || log->get_type() == LOG_ROLLBACK) {
        winners.insert(log->get_trx_id());
        losers.erase(log->get_trx_id());
    }
    next_lsn += log->get_log_size();

    delete log;
}

In the analysis pass, the recovery module reads the log file, and finds the winner and loser. The winner is the transaction that has committed or rolled back. The loser is the transaction that has not committed or rolled back.

fprintf(logmsg_file, "[REDO] Redo pass start\n");
rewind(log_file);

int redo = 0;
while (flag != 1 || redo < log_num) {
    redo++;
    int sz;
    if (fread(&sz, sizeof(int), 1, log_file) != 1) break;
    log_entry_t* log = new log_entry_t(sz);
    fseek(log_file, -sizeof(int), SEEK_CUR);
    fread(log->data, 1, sz, log_file);

    if (log->get_type() == LOG_UPDATE || log->get_type() == LOG_COMPENSATE) {
        int64_t table_id = log->get_table_id();
        if (opened_tables.find(table_id) == opened_tables.end()) {
            opened_tables.insert(table_id);
            std::string filename = "DATA";
            filename += std::to_string(table_id);
            open_table(const_cast<char*>(filename.c_str()));
        }

        control_block_t* ctrl_block = buf_read_page(table_id, log->get_pagenum());
        if (PageIO::BPT::get_page_lsn(ctrl_block->frame) < log->get_lsn()) {
            fprintf(logmsg_file, "LSN %lu [UPDATE] Transaction id %d redo apply\n", log->get_lsn(), log->get_trx_id());
            PageIO::BPT::set_page_lsn(ctrl_block->frame, log->get_lsn());
            char* buffer = new char[log->get_length()];
            log->get_new_image(buffer);
            ctrl_block->frame->set_data(const_cast<const char*>(buffer), log->get_offset(), log->get_length());
            buf_return_ctrl_block(&ctrl_block, 1);
            delete[] buffer;
        } else {
            buf_return_ctrl_block(&ctrl_block);
        }
    }

    delete log;
}

In the redo phase, the module reads the log file from the beginning and redo the log entries. The log entries are applied to the data pages. If the pageLSN is already larger than the log entry LSN, the log entry is ignored.

fprintf(logmsg_file, "[UNDO] Undo pass start\n");
int undo = 0;
while (losers.size() > 0 && (flag != 2 || undo < log_num)) {
    undo++;
    int next_trx;
    uint64_t next_entry = 0;
    for (auto& x : losers) {
        if (next_entry < x.second) {
            next_trx = x.first;
            next_entry = x.second;
        }
        if (trx_table.find(x.first) == trx_table.end()) trx_resurrect(x.first, x.second);
    }
    fseek(log_file, next_entry, SEEK_SET);
    int sz;
    fread(&sz, sizeof(int), 1, log_file);
    log_entry_t* log = new log_entry_t(sz);
    fseek(log_file, -sizeof(int), SEEK_CUR);
    fread(log->data, 1, sz, log_file);
    
    if(log->get_type() == LOG_COMPENSATE){
        fprintf(logmsg_file, "LSN %lu [CLR] Transaction id %d\n", log->get_lsn(), log->get_trx_id());
        losers[log->get_trx_id()] = log->get_next_undo_lsn();
    }

    if (log->get_type() == LOG_UPDATE) {
        control_block_t* ctrl_block = buf_read_page(log->get_table_id(), log->get_pagenum());
        if (PageIO::BPT::get_page_lsn(ctrl_block->frame) >= log->get_lsn()) {
            fprintf(logmsg_file, "LSN %lu [UPDATE] Transaction id %d undo apply\n", log->get_lsn(), log->get_trx_id());
            
            log_entry_t* new_log = create_compensate_log(log->get_trx_id(), log->get_table_id(), log->get_pagenum(), log->get_offset(), log->get_length(), new_, old, log->get_prev_lsn());
            uint64_t new_lsn = add_to_log_buffer(new_log);

            PageIO::BPT::set_page_lsn(ctrl_block->frame, new_lsn);
            ctrl_block->frame->set_data(const_cast<const char*>(old), log->get_offset(), length);
            buf_return_ctrl_block(&ctrl_block, 1);
            delete[] old, new_;
        } else {
            buf_return_ctrl_block(&ctrl_block);
        }
        losers[log->get_trx_id()] = log->get_prev_lsn();
    } else if (log->get_type() == LOG_BEGIN) {
        fprintf(logmsg_file, "LSN %lu [BEGIN] Transaction id %d\n", log->get_lsn(), log->get_trx_id());
        log_entry_t* entry = create_rollback_log(log->get_trx_id());
        uint64_t lsn = add_to_log_buffer(entry);
        losers[log->get_trx_id()] = lsn;
        trx_remove(log->get_trx_id());
        losers.erase(log->get_trx_id());
    }

    delete log;
}

Undo phase is a bit more complicated than the redo phase. The undo phase is similar to the redo phase, but the log entries are applied to the data pages in reverse order. After undo, it creates the compensate log entry and add it to the log buffer. This will prevent the logs from being undone again. (redoing the compensate log will be same as undo)

Simple Test (Local)

I was not able to create a full test. But this is what I have created.

TEST(RecoveryTest, LogCreationTest){
    EXPECT_EQ(init_db(BUF_SIZE, 0, 0, "logfile.data", "logmsg.txt"), 0);

    int table_id = open_table("DATA1");

    int n = 300;

    for (int i = 1; i <= n; i++) {
        std::string data = "01234567890123456789012345678901234567890123456789" + std::to_string(i);
        int res = db_insert(table_id, i, const_cast<char*>(data.c_str()), data.length());
        EXPECT_EQ(res, 0);
    }

    int trx_id = trx_begin();
    EXPECT_GT(trx_id, 0);

    int err = 0;
    uint16_t val_size;
    for (int i = 1; i <= n; i++) {
        char ret_val[112];
        int res = db_find(table_id, i, ret_val, &val_size, trx_id);
        EXPECT_EQ(res, 0);
    }

    trx_commit(trx_id);

    trx_id = trx_begin();
    EXPECT_GT(trx_id, 0);

    for (int i = 1; i <= n; i++) {
        std::string data = "12345678901234567890123456789012345678901234567890" + std::to_string(i);
        uint16_t old_val_size = 0;
        int res = db_update(table_id, i, const_cast<char*>(data.c_str()), data.length(), &old_val_size, trx_id);
        EXPECT_EQ(res, 0);
    }
    trx_commit(trx_id);

    trx_id = trx_begin();
    EXPECT_GT(trx_id, 0);

    for (int i = 1; i <= n; i++) {
        std::string data = "01234567890123456789012345678901234567890123456789" + std::to_string(i);
        uint16_t old_val_size = 0;
        int res = db_update(table_id, i, const_cast<char*>(data.c_str()), data.length(), &old_val_size, trx_id);
        EXPECT_EQ(res, 0);
    }

    trx_id = trx_begin();
    EXPECT_GT(trx_id, 0);

    trx_commit(trx_id);

    //crash before commit
}

In the test, there are 4 transactions. The first transaction finds records. The second transaction updates records. The third transaction updates records again, but never commits. The fourth transaction does nothing and commits.

In the first run of the test, there should be no errors nor the log msg file. This is because there is no log file in the beginning.

[ANALYSIS] Analysis pass start
[ANALYSIS] Analysis success. Winner: , Loser: 
[REDO] Redo pass start
[REDO] Redo pass end
[UNDO] Undo pass start
[UNDO] Undo pass end

In the second run of the test, the recovery module should find the winners 1,2,4 and loser 3. Then, it should undo the trx 3's update.

[ANALYSIS] Analysis pass start
[ANALYSIS] Analysis success. Winner: 1 2 4, Loser: 3
[REDO] Redo pass start
LSN 0 [BEGIN] Transaction id 1
LSN 28 [COMMIT] Transaction id 1
LSN 56 [BEGIN] Transaction id 2
LSN 84 [UPDATE] Transaction id 2 redo apply
LSN 234 [UPDATE] Transaction id 2 redo apply
LSN 384 [UPDATE] Transaction id 2 redo apply
LSN 534 [COMMIT] Transaction id 2
LSN 562 [BEGIN] Transaction id 3
LSN 590 [UPDATE] Transaction id 3 redo apply
LSN 740 [UPDATE] Transaction id 3 redo apply
LSN 890 [UPDATE] Transaction id 3 redo apply
LSN 1040 [BEGIN] Transaction id 4
LSN 1068 [COMMIT] Transaction id 4
[REDO] Redo pass end
[UNDO] Undo pass start
LSN 890 [UPDATE] Transaction id 3 undo apply
LSN 740 [UPDATE] Transaction id 3 undo apply
LSN 590 [UPDATE] Transaction id 3 undo apply
LSN 562 [BEGIN] Transaction id 3
[UNDO] Undo pass end

There are no CONSIDER-REDOs because the page was never flushed from the buffer, and the actual DB file was never updated.

After running same test 3 times, the log is:

[ANALYSIS] Analysis pass start
[ANALYSIS] Analysis success. Winner: 1 2 3 4 5 6 7 8 9 10 12, Loser: 11
[REDO] Redo pass start
LSN 0 [BEGIN] Transaction id 1
LSN 28 [COMMIT] Transaction id 1
LSN 56 [BEGIN] Transaction id 2
...
LSN 3580 [UPDATE] Transaction id 10 redo apply
LSN 3730 [COMMIT] Transaction id 10
LSN 3758 [BEGIN] Transaction id 11
LSN 3786 [UPDATE] Transaction id 11 redo apply
LSN 3936 [UPDATE] Transaction id 11 redo apply
LSN 4086 [UPDATE] Transaction id 11 redo apply
LSN 4236 [BEGIN] Transaction id 12
LSN 4264 [COMMIT] Transaction id 12
[REDO] Redo pass end
[UNDO] Undo pass start
LSN 4086 [UPDATE] Transaction id 11 undo apply
LSN 3936 [UPDATE] Transaction id 11 undo apply
LSN 3786 [UPDATE] Transaction id 11 undo apply
LSN 3758 [BEGIN] Transaction id 11
[UNDO] Undo pass end

We see that trx 3 and 7 are considered as winners because it was undone before.