diff --git a/src/dbtest/src/mda_generate.py b/src/dbtest/src/mda_generate.py index c10768a7..a9c4e3c8 100644 --- a/src/dbtest/src/mda_generate.py +++ b/src/dbtest/src/mda_generate.py @@ -9,6 +9,7 @@ # * # */ +import networkx as nx from operator import truediv import os import sys @@ -51,15 +52,17 @@ def __init__(self, op_type, txn_num, op_num): Returns: int: The data_num value, which depends on the database type and determines the number of data columns in each table. -This function initializes tables for database testing by generating SQL statements and writing them to the specified -file (`file_name`). The SQL statements include DROP TABLE IF EXISTS and CREATE TABLE statements for the specified +This function initializes tables for database testing by generating SQL statements and writing them to the specified +file (`file_name`). The SQL statements include DROP TABLE IF EXISTS and CREATE TABLE statements for the specified number of tables (`table_num`). -The function takes into account the `db_type` and `test_type` to determine the structure of the created tables and -the number of data columns. It returns the `data_num` value, which is an integer that depends on the database type +The function takes into account the `db_type` and `test_type` to determine the structure of the created tables and +the number of data columns. It returns the `data_num` value, which is an integer that depends on the database type and determines the number of data columns in each table. """ + + def init_table(file_name, sql_count, txn_count, table_num, db_type, test_type): data_num = 2 with open(file_name, "a+") as file_test: @@ -70,30 +73,30 @@ def init_table(file_name, sql_count, txn_count, table_num, db_type, test_type): for i in range(1, table_num + 1): # MySQL 5.1 add InnoDB for table # create_sql = str(sql_count) + "-" + str(txn_count) + "-" + "CREATE TABLE t" + str(i) + \ - # " (k INT PRIMARY KEY, v INT) ENGINE=InnoDB;\n" + # " (k INT PRIMARY KEY, v INT) ENGINE=InnoDB;\n" create_sql = str(sql_count) + "-" + str(txn_count) + "-" + "CREATE TABLE t" + str(i) + \ - " (k INT PRIMARY KEY, v INT);\n" + " (k INT PRIMARY KEY, v INT);\n" file_test.write(create_sql) elif db_type == "tdsql" or db_type == "ob_oracle": data_num = 4 for i in range(1, table_num + 1): create_sql = str(sql_count) + "-" + str(txn_count) + "-" + "CREATE TABLE t" + str(i) + \ - " (k INT, v INT, value1 INT, value2 INT, PRIMARY KEY (v,k)) PARTITION BY RANGE(v) " \ - "(PARTITION p0 VALUES LESS THAN(2), PARTITION p1 VALUES LESS THAN(4));\n" + " (k INT, v INT, value1 INT, value2 INT, PRIMARY KEY (v,k)) PARTITION BY RANGE(v) " \ + "(PARTITION p0 VALUES LESS THAN(2), PARTITION p1 VALUES LESS THAN(4));\n" file_test.write(create_sql) elif db_type == "crdb": data_num = 4 for i in range(1, table_num + 1): create_sql = str(sql_count) + "-" + str(txn_count) + "-" + "CREATE TABLE t" + str(i) + \ - " (k INT, v INT, value1 INT, value2 INT, PRIMARY KEY (v,k)) PARTITION BY RANGE(v) " \ - "(PARTITION p0 VALUES FROM (MINVALUE) TO (2), " \ - "PARTITION p1 VALUES FROM (2) TO (MAXVALUE));\n" + " (k INT, v INT, value1 INT, value2 INT, PRIMARY KEY (v,k)) PARTITION BY RANGE(v) " \ + "(PARTITION p0 VALUES FROM (MINVALUE) TO (2), " \ + "PARTITION p1 VALUES FROM (2) TO (MAXVALUE));\n" file_test.write(create_sql) else: for i in range(1, table_num + 1): create_sql = str(sql_count) + "-" + str(txn_count) + "-" + "CREATE TABLE t" + str(i) + \ - " (k INT, v INT) PARTITION BY RANGE(v) (PARTITION p0 VALUES LESS THAN(2), " \ - "PARTITION p1 VALUES LESS THAN(4));\n" + " (k INT, v INT) PARTITION BY RANGE(v) (PARTITION p0 VALUES LESS THAN(2), " \ + "PARTITION p1 VALUES LESS THAN(4));\n" file_test.write(create_sql) return data_num @@ -112,6 +115,8 @@ def init_table(file_name, sql_count, txn_count, table_num, db_type, test_type): # if both transactions are running # or the start time of the second transaction is less than the end time of the first transaction # we think they are concurrent + + def check_concurrency(txn_num1, txn_num2, txn): if txn[txn_num2].begin_ts < txn[txn_num1].end_ts: return True @@ -134,14 +139,16 @@ def check_concurrency(txn_num1, txn_num2, txn): Returns: bool: True if the specified operation type exists in the transaction and is concurrent; False otherwise. -This function checks if a specific operation type (`op`) exists in a transaction (`txn`) by examining -the list of data operations (`data_op_list`) associated with that operation number (`op_num`). If the -specified operation type exists in the transaction and is concurrent with other transactions, the +This function checks if a specific operation type (`op`) exists in a transaction (`txn`) by examining +the list of data operations (`data_op_list`) associated with that operation number (`op_num`). If the +specified operation type exists in the transaction and is concurrent with other transactions, the function returns True; otherwise, it returns False. The function is designed to help identify and handle concurrent operations in a transactional context. """ + + def check_exist_op(txn, data_op_list, op, op_num, txn_count): flag, txn_num = False, 0 for data in data_op_list[op_num]: @@ -173,8 +180,8 @@ def check_exist_op(txn, data_op_list, op, op_num, txn_count): Returns: int: The updated count of SQL operations after execution. -This function executes an SQL operation within a transaction context. It takes into account whether the -operation is a predicate operation (IsPredicate flag) and writes the SQL operation to the specified file. +This function executes an SQL operation within a transaction context. It takes into account whether the +operation is a predicate operation (IsPredicate flag) and writes the SQL operation to the specified file. The function also updates the SQL operation count and performs the necessary actions based on the type of SQL operation. @@ -190,6 +197,8 @@ def check_exist_op(txn, data_op_list, op, op_num, txn_count): The function returns the updated count of SQL operations after execution. """ + + def execute_sql(IsPredicate, file_name, sql_count, txn_count, op_num, op, data_num, txn, data_value, data_op_list): # if check_exist_op(txn, data_op_list, op, op_num, txn_count): # return sql_count @@ -241,11 +250,13 @@ def execute_sql(IsPredicate, file_name, sql_count, txn_count, op_num, op, data_n The function returns the updated count of SQL operations after the insert. """ + + def insert_data(file_name, sql_count, txn_count, cur_count, partition_num, insert_table, data_num, exist, data_value): with open(file_name, "a+") as file_test: try: - #if exist[cur_count]: + # if exist[cur_count]: if False: raise OptionException else: @@ -258,11 +269,11 @@ def insert_data(file_name, sql_count, txn_count, cur_count, partition_num, inser exist[cur_count] = True if data_num == 2: insert_sql = str(sql_count) + "-" + str(txn_count) + "-" + "INSERT INTO t" + \ - str(insert_table) + " VALUES (" + str(cur_count) + "," + str(cur_count) + ");\n" + str(insert_table) + " VALUES (" + str(cur_count) + "," + str(cur_count) + ");\n" else: insert_sql = str(sql_count) + "-" + str(txn_count) + "-" + "INSERT INTO t" + \ - str(insert_table) + " VALUES (" + str(cur_count) + "," + str(partition_num) + \ - "," + str(cur_count) + "," + str(cur_count) + ");\n" + str(insert_table) + " VALUES (" + str(cur_count) + "," + str(partition_num) + \ + "," + str(cur_count) + "," + str(cur_count) + ");\n" file_test.write(insert_sql) data_value[cur_count] = cur_count except OptionException: @@ -292,14 +303,16 @@ def insert_data(file_name, sql_count, txn_count, cur_count, partition_num, inser Returns: int: The updated count of SQL operations after the delete. -This function deletes data from a table within a transaction context. It generates an SQL delete statement -based on the provided parameters and writes the statement to the specified file. The function also updates -the SQL operation count, manages the existence of data elements, and records the delete operation in the +This function deletes data from a table within a transaction context. It generates an SQL delete statement +based on the provided parameters and writes the statement to the specified file. The function also updates +the SQL operation count, manages the existence of data elements, and records the delete operation in the data operation list. The function returns the updated count of SQL operations after the delete. """ + + def delete_data(file_name, sql_count, txn_count, cur_count, delete_table, data_num, exist, txn, data_op_list): with open(file_name, "a+") as file_test: try: @@ -314,10 +327,10 @@ def delete_data(file_name, sql_count, txn_count, cur_count, delete_table, data_n exist[cur_count] = False if data_num == 2: delete_sql = str(sql_count) + "-" + str(txn_count) + "-" + "DELETE FROM t" + \ - str(delete_table) + " WHERE k=" + str(cur_count) + ";\n" + str(delete_table) + " WHERE k=" + str(cur_count) + ";\n" else: delete_sql = str(sql_count) + "-" + str(txn_count) + "-" + "DELETE FROM t" + \ - str(delete_table) + " WHERE value1=" + str(cur_count) + ";\n" + str(delete_table) + " WHERE value1=" + str(cur_count) + ";\n" file_test.write(delete_sql) data_op_list[cur_count].append(Operation("D", txn_count)) except OptionException: @@ -343,7 +356,7 @@ def delete_data(file_name, sql_count, txn_count, cur_count, delete_table, data_n int: The updated count of SQL operations after the write. This function writes data to a table within a transaction context, incrementing its value by 1. It generates -an SQL update statement based on the provided parameters and writes the statement to the specified file. +an SQL update statement based on the provided parameters and writes the statement to the specified file. The function also updates the SQL operation count, increments the data value, and records the write operation in the data operation list. @@ -351,10 +364,12 @@ def delete_data(file_name, sql_count, txn_count, cur_count, delete_table, data_n """ # when updating data, increment its value by 1 + + def write_data(file_name, sql_count, txn_count, op_num, data_num, txn, data_value, data_op_list): with open(file_name, "a+") as file_test: try: - #if not exist[op_num] or txn[txn_count].end_ts != max_time: + # if not exist[op_num] or txn[txn_count].end_ts != max_time: if txn[txn_count].end_ts != max_time: raise OptionException else: @@ -365,11 +380,11 @@ def write_data(file_name, sql_count, txn_count, op_num, data_num, txn, data_valu sql_count += 1 if data_num == 2: write_sql = str(sql_count) + "-" + str(txn_count) + "-" + "UPDATE t1 SET v=" + \ - str(data_value[op_num] + 1) + " WHERE k=" + str(op_num) + ";\n" + str(data_value[op_num] + 1) + " WHERE k=" + str(op_num) + ";\n" else: write_sql = str(sql_count) + "-" + str(txn_count) + "-" + "UPDATE t" + str(txn_count) + \ - " SET value2=" + str(data_value[op_num] + 1) + " WHERE value1=" + \ - str(op_num) + ";\n" + " SET value2=" + str(data_value[op_num] + 1) + " WHERE value1=" + \ + str(op_num) + ";\n" file_test.write(write_sql) data_op_list[op_num].append(Operation("W", txn_count)) data_value[op_num] += 1 @@ -394,13 +409,15 @@ def write_data(file_name, sql_count, txn_count, op_num, data_num, txn, data_valu Returns: int: The updated count of SQL operations after the read. -This function reads data from a table within a transaction context. It generates an SQL select statement +This function reads data from a table within a transaction context. It generates an SQL select statement based on the provided parameters and writes the statement to the specified file. The function also updates the SQL operation count and records the read operation in the data operation list. The function returns the updated count of SQL operations after the read. """ + + def read_data(file_name, sql_count, txn_count, op_num, data_num, txn, data_op_list): with open(file_name, "a+") as file_test: try: @@ -414,10 +431,10 @@ def read_data(file_name, sql_count, txn_count, op_num, data_num, txn, data_op_li sql_count += 1 if data_num == 2: read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t1 WHERE k=" + \ - str(op_num) + ";\n" + str(op_num) + ";\n" else: read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t" + str(txn_count) + \ - " WHERE value1=" + str(op_num) + ";\n" + " WHERE value1=" + str(op_num) + ";\n" file_test.write(read_sql) data_op_list[op_num].append(Operation("R", txn_count)) except OptionException: @@ -442,12 +459,14 @@ def read_data(file_name, sql_count, txn_count, op_num, data_num, txn, data_op_li int: The updated count of SQL operations after the read. This function reads data from a table within a transaction context with a predicate (range condition). It generates -an SQL select statement based on the provided parameters and writes the statement to the specified file. +an SQL select statement based on the provided parameters and writes the statement to the specified file. The function also updates the SQL operation count and records the read operation in the data operation list. The function returns the updated count of SQL operations after the read. """ + + def read_data_predicate(file_name, sql_count, txn_count, op_num, data_num, txn, data_op_list): with open(file_name, "a+") as file_test: try: @@ -461,10 +480,10 @@ def read_data_predicate(file_name, sql_count, txn_count, op_num, data_num, txn, sql_count += 1 if data_num == 2: read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t1 WHERE k>" + \ - str(op_num*2) + " and k<" + str(op_num*2+2) + ";\n" + str(op_num*2) + " and k<" + str(op_num*2+2) + ";\n" else: read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t" + str(txn_count) + \ - " WHERE value1>" + str(op_num*2) + " and value1<" + str(op_num*2+2) + ";\n" + " WHERE value1>" + str(op_num*2) + " and value1<" + str(op_num*2+2) + ";\n" file_test.write(read_sql) data_op_list[op_num].append(Operation("P", txn_count)) except OptionException: @@ -485,13 +504,15 @@ def read_data_predicate(file_name, sql_count, txn_count, op_num, data_num, txn, Returns: int: The updated count of SQL operations after the rollback. -This function aborts (rolls back) a transaction by generating an SQL rollback statement based on the provided -parameters and writes the statement to the specified file. It updates the SQL operation count and marks the +This function aborts (rolls back) a transaction by generating an SQL rollback statement based on the provided +parameters and writes the statement to the specified file. It updates the SQL operation count and marks the transaction as ended. If the transaction has already ended, it logs an error message. The function returns the updated count of SQL operations after the rollback. """ + + def abort_txn(file_name, sql_count, txn_count, txn): with open(file_name, "a+") as file_test: try: @@ -520,12 +541,14 @@ def abort_txn(file_name, sql_count, txn_count, txn): int: The updated count of SQL operations after the commit. This function commits a transaction by generating an SQL commit statement based on the provided parameters and -writes the statement to the specified file. It updates the SQL operation count and marks the transaction as ended. +writes the statement to the specified file. It updates the SQL operation count and marks the transaction as ended. If the transaction has already ended, it logs an error message. The function returns the updated count of SQL operations after the commit. """ + + def commit_txn(file_name, sql_count, txn_count, txn): with open(file_name, "a+") as file_test: try: @@ -551,11 +574,13 @@ def commit_txn(file_name, sql_count, txn_count, txn): - data_num (int): The number of data columns in each table. - table_num (int): The total number of tables. -This function generates and executes a check transaction. It begins the transaction, performs ordered SELECT +This function generates and executes a check transaction. It begins the transaction, performs ordered SELECT queries on all tables to read data, and then commits the transaction. The generated SQL statements are written to the specified file. """ + + def execute_check(file_name, sql_count, txn_count, data_num, table_num): with open(file_name, "a+") as file_test: begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n" @@ -566,13 +591,14 @@ def execute_check(file_name, sql_count, txn_count, data_num, table_num): read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t1 ORDER BY k;\n" else: read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t" + str(i) + \ - " ORDER BY k;\n" + " ORDER BY k;\n" file_test.write(read_sql) sql_count += 1 commit_sql = str(sql_count) + "-" + str(txn_count) + "-" + "COMMIT;\n" file_test.write(commit_sql) sql_count += 1 + """ Check the last operation before the current position in a list of operations. @@ -584,12 +610,14 @@ def execute_check(file_name, sql_count, txn_count, data_num, table_num): - last_op_type (str): The type of the last operation before the current position. - last_txn_num (int): The transaction number of the last operation before the current position. -This function examines the list of operations and checks the last operation that occurred before the current position. -It returns the type of that operation (e.g., "C" for commit, "A" for abort, or a specific operation type) and the +This function examines the list of operations and checks the last operation that occurred before the current position. +It returns the type of that operation (e.g., "C" for commit, "A" for abort, or a specific operation type) and the corresponding transaction number. """ # check if the txn commit/abort before or not + + def get_last_op(ops, pos): if pos == 0: return -1, -1 @@ -598,6 +626,7 @@ def get_last_op(ops, pos): else: return ops[pos-1][1], int(ops[pos-1][2]) + """ Process the first operation of a Partial Order Pair (POP) for a given transaction. @@ -617,12 +646,14 @@ def get_last_op(ops, pos): - sql_count (int): The updated count of SQL queries after processing the first operation of the POP. This function processes the first operation of a Partial Order Pair (POP) for a given transaction. It checks whether -the last operation before the current operation has the same type and number. If not, it executes the SQL query for -the current operation, updates the SQL query count, and advances the transaction count. The function returns the +the last operation before the current operation has the same type and number. If not, it executes the SQL query for +the current operation, updates the SQL query count, and advances the transaction count. The function returns the updated SQL query count. """ # process first operation of a POP + + def execute_first(IsPredicate, num, ops, file_name, sql_count, txn_count, data_num, txn, data_value, data_op_list): for i in range(num): op1 = ops[i][0] @@ -634,12 +665,15 @@ def execute_first(IsPredicate, num, ops, file_name, sql_count, txn_count, data_n last_op_type, last_op_num = get_last_op(ops, i) if last_op_num == op_num and last_op_type == op1: return sql_count - sql_count = execute_sql(IsPredicate, file_name, sql_count, txn_count, op_num, op1, data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, txn_count, + op_num, op1, data_num, txn, data_value, data_op_list) sql_count += 1 txn_count += 1 - if txn_count == num + 1: txn_count = 1 + if txn_count == num + 1: + txn_count = 1 return sql_count + """ Process the second operation of a Partial Order Pair (POP) for a given transaction. @@ -659,13 +693,15 @@ def execute_first(IsPredicate, num, ops, file_name, sql_count, txn_count, data_n Returns: - sql_count (int): The updated count of SQL queries after processing the second operation of the POP. -This function processes the second operation of a Partial Order Pair (POP) for a given transaction. It iterates -through the list of operations, executes the SQL queries for each operation, updates the SQL query count, and -advances the transaction count. If an operation requires an "AC" (Abort or Commit) operation, it is also executed. +This function processes the second operation of a Partial Order Pair (POP) for a given transaction. It iterates +through the list of operations, executes the SQL queries for each operation, updates the SQL query count, and +advances the transaction count. If an operation requires an "AC" (Abort or Commit) operation, it is also executed. The function returns the updated SQL query count. """ # process second operation of a POP + + def execute_second(IsPredicate, num, ops, need_ac, file_name, sql_count, txn_count, data_num, txn, data_value, data_op_list): for i in range(num): if ops[i][1] == "C" or ops[i][1] == "A": @@ -674,15 +710,17 @@ def execute_second(IsPredicate, num, ops, need_ac, file_name, sql_count, txn_cou else: op2 = ops[i][1] op_num = int(ops[i][2:]) - sql_count = execute_sql(IsPredicate, file_name, sql_count, txn_count, op_num, op2, data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, txn_count, + op_num, op2, data_num, txn, data_value, data_op_list) sql_count += 1 if need_ac[txn_count] != "NO": sql_count = execute_sql(IsPredicate, file_name, sql_count, txn_count, i, need_ac[txn_count], data_num, txn, data_value, data_op_list) sql_count += 1 txn_count += 1 - if txn_count == num + 1: txn_count = 1 - return sql_count + if txn_count == num + 1: + txn_count = 1 + return sql_count """ @@ -704,8 +742,8 @@ def execute_second(IsPredicate, num, ops, need_ac, file_name, sql_count, txn_cou Returns: - sql_count (int): The updated count of SQL queries after executing the transaction. -This function executes a transaction while considering the order of operations for conflict resolution. -It iterates through the list of operations, executes SQL queries, and manages the execution order to +This function executes a transaction while considering the order of operations for conflict resolution. +It iterates through the list of operations, executes SQL queries, and manages the execution order to resolve conflicts. The function returns the updated SQL query count. """ @@ -714,7 +752,9 @@ def execute_second(IsPredicate, num, ops, need_ac, file_name, sql_count, txn_cou # otherwise, execute in the original order # such as RW0-RW1, we will execute as order R1[x]-R2[y]-W2[x]-W1[y] # IR0-ICW1-RW1, we will execute as order I1[x]-I2[y]-R2[x]-C2-W3[y]-R3[y]-W1[y] -def execute_txn(IsPredicate, num, ops, need_ac, file_name, sql_count, data_num, txn, data_value, wait_op_list,data_op_list): + + +def execute_txn(IsPredicate, num, ops, need_ac, file_name, sql_count, data_num, txn, data_value, wait_op_list, data_op_list): for i in range(num): op1 = ops[i][0] if ops[i][1] == "C" or ops[i][1] == "A": @@ -725,51 +765,62 @@ def execute_txn(IsPredicate, num, ops, need_ac, file_name, sql_count, data_num, op2 = ops[i][1] op_num = int(ops[i][2:]) next_txn_count = i + 2 - if next_txn_count == num+1: next_txn_count = 1 + if next_txn_count == num+1: + next_txn_count = 1 if i == 0: - sql_count = execute_sql(IsPredicate, file_name, sql_count, i+1, op_num, op1, data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, i+1, op_num, + op1, data_num, txn, data_value, data_op_list) sql_count += 1 wait_op_list.append(Wait_Operation(op2, next_txn_count, op_num)) else: last_op_type, last_op_num = get_last_op(ops, i) if last_op_num == op_num: - #if last_op_num and last_op_type both are same as now operation, we will only execute later operation + # if last_op_num and last_op_type both are same as now operation, we will only execute later operation if op1 == last_op_type: wait_op_list.pop() for data in wait_op_list: - sql_count = execute_sql(IsPredicate, file_name, sql_count, data.txn_num, data.op_num, data.op_type, data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, data.txn_num, + data.op_num, data.op_type, data_num, txn, data_value, data_op_list) sql_count += 1 if need_ac[data.txn_num] != "NO" and data.op_num != op_num: - sql_count = execute_sql(IsPredicate, file_name, sql_count, data.txn_num, data.op_num, need_ac[data.txn_num], data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, data.txn_num, + data.op_num, need_ac[data.txn_num], data_num, txn, data_value, data_op_list) sql_count += 1 wait_op_list = [] - sql_count = execute_sql(IsPredicate, file_name, sql_count, i+1, op_num, op1, data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, i+1, op_num, + op1, data_num, txn, data_value, data_op_list) sql_count += 1 if need_ac[i+1] != "NO": - sql_count = execute_sql(IsPredicate, file_name, sql_count, i+1, op_num, need_ac[i+1], data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, i+1, op_num, + need_ac[i+1], data_num, txn, data_value, data_op_list) sql_count += 1 - - sql_count = execute_sql(IsPredicate, file_name, sql_count, next_txn_count, op_num, op2, data_num, txn, data_value, data_op_list) + + sql_count = execute_sql(IsPredicate, file_name, sql_count, next_txn_count, + op_num, op2, data_num, txn, data_value, data_op_list) sql_count += 1 else: - sql_count = execute_sql(IsPredicate, file_name, sql_count, i+1, op_num, op1, data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, i+1, op_num, + op1, data_num, txn, data_value, data_op_list) sql_count += 1 if len(wait_op_list) == 0: if need_ac[i+1] != "NO": - sql_count = execute_sql(IsPredicate, file_name, sql_count, i+1, op_num, need_ac[i+1], data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, i+1, op_num, + need_ac[i+1], data_num, txn, data_value, data_op_list) sql_count += 1 wait_op_list.append(Wait_Operation(op2, next_txn_count, op_num)) for data in wait_op_list: - sql_count = execute_sql(IsPredicate, file_name, sql_count, data.txn_num, data.op_num, data.op_type, data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, data.txn_num, data.op_num, + data.op_type, data_num, txn, data_value, data_op_list) sql_count += 1 if need_ac[data.txn_num] != "NO": - sql_count = execute_sql(IsPredicate, file_name, sql_count, data.txn_num, data.op_num, need_ac[data.txn_num], data_num, txn, data_value, data_op_list) + sql_count = execute_sql(IsPredicate, file_name, sql_count, data.txn_num, data.op_num, + need_ac[data.txn_num], data_num, txn, data_value, data_op_list) sql_count += 1 wait_op_list = [] @@ -785,10 +836,12 @@ def execute_txn(IsPredicate, num, ops, need_ac, file_name, sql_count, data_num, - op_num (int): The number of operations. - data_num (int): The number of data parameters. -This function writes a description for the test case to the specified file. The description includes +This function writes a description for the test case to the specified file. The description includes information about the test case pattern, parameters, and structure. """ + + def write_description(file_name, txn_num, op_num, data_num): with open(file_name, "w+") as file_test: description = "#\n" @@ -798,20 +851,262 @@ def write_description(file_name, txn_num, op_num, data_num): for i in range(len(patterns)): description += "T" + str(i+1) + " ==" + patterns[i] + "==> " description += "T1\n" - description += "# Parameters: #column=2 #txn=" + str(txn_num) + " #operations=" + str(op_num) + " #variable=" + str(data_num) + "\n" - description += "# Structure: Sequence-Session-Query" + "\n" - description += "# When sequence=0, it is a preparation phase, otherwise an execution phase" + "\n" + description += "# Parameters: #column=2 #txn=" + \ + str(txn_num) + " #operations=" + str(op_num) + " #variable=" + str(data_num) + "\n" + description += "# Structure: Sequence-Session-Query" + "\n" + description += "# When sequence=0, it is a preparation phase, otherwise an execution phase" + "\n" description += "#\n" file_test.write(description) -# target folder -case_folder = "t/test_case_v2" -# pattern files -do_test_list = "do_test_list.txt" + +def check_isolation(edges, database, isolation): + if database == "mysql": + if isolation == "ru": + c = Checker(edges) + return not c.cycle() or c.deadlock() + elif isolation == "rc": + c = Checker(edges) + c.reverse_wr_edges_to_rw() + return not c.cycle() or c.deadlock() + elif isolation == "rr": + c = Checker(edges) + c.reverse_wr_edges_to_rw() + c.reverse_wcr_edges_to_rw() + return not c.cycle() or c.deadlock() + elif isolation == "ser": + return True + + +read_op_set = ["P", "R"] +write_op_set = ["I", "W", "D"] + + +class Operator: + def __init__(self, tid, type, key): + self.finished = False + self.blocked_by = None + self.tid = tid + self.type = type + self.key = key + self.pos = -1 + + def set_pos(self, pos): + self.pos = pos + + +class Checker: + def __init__(self, edges, shared_mode="no"): + self.edges = edges + self.order = [] + self.reorder = [] + self.history = {} # key -> list of (operation_type, tid, op) + self.graph = None + self.num_nodes = len(edges) + self.committed_txns = set() + self.txns = {i: [] for i in range(self.num_nodes)} + self.parse_edges() + self.reorderOp(shared_mode=shared_mode) + self.build_dependency_graph() + + def parse_edges(self): + commit = {} # tid -> Operator + for i in range(self.num_nodes): + j = (i + 1) % self.num_nodes + op_2nd = self.edges[i][-2] + key = self.edges[i][-1] + next_op_1st = self.edges[j][0] + next_key = self.edges[j][-1] + if len(self.edges[i]) == 4: + self.order.append(Operator(i, "C", -1)) # txn commit + commit[i] = self.order[-1] + if j == 0: + self.order.append(Operator(j, op_2nd, key)) + self.order.insert(0, Operator(j, next_op_1st, next_key)) + else: + if key == next_key: # do not swap the operate in txn j + if op_2nd == next_op_1st: + self.order.append(Operator(j, op_2nd, key)) # first op of txn j + else: + self.order.append(Operator(j, op_2nd, key)) + self.order.append(Operator(j, next_op_1st, next_key)) + else: # swap the operate in txn j + self.order.append(Operator(j, next_op_1st, next_key)) + self.order.append(Operator(j, op_2nd, key)) + for i in range(self.num_nodes): + if i not in commit: + self.order.append(Operator(i, "C", -1)) # txn commit + for op in self.order: + self.txns[op.tid].append(op) + for txn in self.txns.values(): + for i in range(1, len(txn)): + txn[i].blocked_by = txn[i - 1] + + def reorderOp(self, exclusive_mode="long", shared_mode="no"): + txns_block_key = {i: 0 for i in range(self.num_nodes)} + after = {} # op -> key(lock item) + lock_table = {} # key -> (lock_type, set of transaction_ids) + + def log_operation(op, operation_type): + if op.key not in self.history: + self.history[op.key] = [] + + if op.tid in self.committed_txns: + if operation_type == 'W': + operation_type = 'WC' + elif operation_type == 'R': + operation_type = 'RC' + + self.history[op.key].append((operation_type, op.tid, op)) + + def process_operation(op): + if op.type == "C": + self.reorder.append(op) + op.set_pos(len(self.reorder)) + op.finished = True + self.committed_txns.add(op.tid) + # log_operation(op, 'C') + for cop in self.txns[op.tid]: + if cop.key in lock_table: + lock_type, holders = lock_table[cop.key] + if op.tid in holders: + holders.remove(op.tid) + if not holders: + del lock_table[cop.key] + blocked_ops = after.pop(cop.key, []) + for blocked_op in blocked_ops: + process_operation(blocked_op) + return + + if op.type in write_op_set: + if op.key in lock_table: + current_lock_type, holders = lock_table[op.key] + if (current_lock_type == 'X' or current_lock_type == 'S') and op.tid not in holders: + after.setdefault(op.key, []).append(op) + txns_block_key[op.tid] = op.key + return + lock_table[op.key] = ('X', {op.tid}) # lock + self.reorder.append(op) + op.set_pos(len(self.reorder)) + op.finished = True + log_operation(op, 'W') + return + if op.type in read_op_set: + if shared_mode != "no": + if op.key in lock_table: + current_lock_type, holders = lock_table[op.key] + if current_lock_type == 'X' and op.tid not in holders: + after.setdefault(op.key, []).append(op) + txns_block_key[op.tid] = op.key + return + self.reorder.append(op) + op.set_pos(len(self.reorder)) + op.finished = True + log_operation(op, 'R') + if shared_mode == "long": + if op.key in lock_table: + lock_table[op.key][1].add(op.tid) + else: + lock_table[op.key] = ('S', {op.tid}) + return + for op in self.order: + if op.blocked_by and not op.blocked_by.finished: + after[txns_block_key[op.tid]].append(op) + elif op.blocked_by is None or op.blocked_by.finished: + process_operation(op) + + def deadlock(self): + return len(self.reorder) != len(self.order) + + def cycle(self): + try: + cycle = nx.find_cycle(self.graph, orientation='original') + # print("Found a cycle:") + # print(" -> ".join(f"{edge[0]}->{edge[1]}" for edge in cycle)) + return True + except nx.NetworkXNoCycle: + # print("No cycles found.") + return False + + def build_dependency_graph(self): + self.graph = nx.MultiDiGraph() + for key, operations in self.history.items(): + seen_transactions = set() + for i, (op_type, tid, op) in enumerate(operations): + if tid not in self.graph: + self.graph.add_node(tid) + for seen_tid in seen_transactions: + if seen_tid != tid: + prev_op_type, prev_tid, prev_op = operations[i - 1] + if prev_op_type == 'WC' and op_type.startswith('R'): + key = 'WCR' + elif prev_op_type == 'RC' and op_type.startswith('W'): + key = 'RCW' + elif prev_op_type == 'WC' and op_type.startswith('W'): + key = 'WCW' + elif prev_op_type.startswith('W') and op_type.startswith('R'): + key = 'WR' + elif prev_op_type.startswith('R') and op_type.startswith('W'): + key = 'RW' + elif prev_op_type.startswith('W') and op_type.startswith('W'): + key = 'WW' + else: + key = 'unknown' + self.graph.add_edge(seen_tid, tid, key=key) + seen_transactions.add(tid) + + def reverse_wr_edges_to_rw(self): + edges_to_reverse = [(u, v, key) for u, v, key in self.graph.edges(keys=True) if key == 'WR'] + for u, v, data in edges_to_reverse: + self.graph.add_edge(v, u, key='RW') + self.graph.remove_edge(u, v, key='WR') + + def reverse_wcr_edges_to_rw(self): + edges_to_reverse = [] + for u, v, key in self.graph.edges(keys=True): + if key == 'WCR': + txn_u = self.txns[u] + txn_v = self.txns[v] + w = txn_u[-2] + br = txn_v[0] + if br.pos < w.pos and br.type in read_op_set: + edges_to_reverse.append((u, v, key)) + for u, v, key in edges_to_reverse: + self.graph.add_edge(v, u, key='RW') + self.graph.remove_edge(u, v, key=key) + + + def printHistory(self): + print("Operation History:") + for key, operations in self.history.items(): + print(f"Key: {key}") + for operation in operations: + op_type, tid, op = operation + status = "committed" if tid in self.committed_txns else "pending" + print(f" - {op_type} by T{tid} [{status}]") + + # [single,distributed] => for local test or distributed test db_type = sys.argv[1] # [tdsql] => for pg/sql standard queries test_type = sys.argv[2] + +database = sys.argv[3] +isolation = sys.argv[4] + +# # [single,distributed] => for local test or distributed test +# db_type = "single" +# # # [tdsql] => for pg/sql standard queries +# test_type = "" + +# database = "mysql" +# isolation = "rc" + +# target folder +case_folder = f"t/test_case_v2_{database}_{isolation}" +# pattern files +do_test_list = f"do_test_list" + + max_time = 99999999999999999999 with open(do_test_list, "r") as f: lines = f.readlines() @@ -833,7 +1128,8 @@ def write_description(file_name, txn_num, op_num, data_num): if popg[0] == "#": continue ops = popg.split('-') - + if not check_isolation(ops, database, isolation): # can not break the cycles in pop + continue path_store = case_folder if not os.path.exists(path_store): os.mkdir(path_store) @@ -864,7 +1160,7 @@ def write_description(file_name, txn_num, op_num, data_num): insert_num = num + 1 else: insert_num = num - + # barrier data insertion for i in range(1, insert_num + 1): insert_data(file_name, sql_count, txn_count, cur_count, partition_num, insert_table, data_num, exist, @@ -875,8 +1171,9 @@ def write_description(file_name, txn_num, op_num, data_num): cur_count += 1 partition_num ^= 2 insert_table += 1 - if table_num == 1: insert_table = 1 - + if table_num == 1: + insert_table = 1 + # for a data, if W and D operation at an earlier time, we will insert this data when initialized for i in range(len(ops)): if ops[i][1] == "C" or ops[i][1] == "A": @@ -887,24 +1184,27 @@ def write_description(file_name, txn_num, op_num, data_num): visit[op_num] = True if ops[i][0] == "R" or ops[i][0] == "P": if ops[i].find("I") == -1: - insert_data(file_name, sql_count, txn_count, 2*op_num+1, partition_num, insert_table, data_num, exist, data_value) + insert_data(file_name, sql_count, txn_count, 2*op_num+1, + partition_num, insert_table, data_num, exist, data_value) elif ops[i][0] == "D" or ops[i][0] == "W": insert_data(file_name, sql_count, txn_count, 2*op_num+1, partition_num, insert_table, data_num, exist, data_value) partition_num ^= 2 insert_table += 1 - if table_num == 1: insert_table = 1 + if table_num == 1: + insert_table = 1 # execution txn = [Txn() for i in range(num + 2)] wait_op_list = [] data_op_list = [[] for i in range(2*num + 2)] - + # need_ac means whether need abort/commit immediately and if necessary which operation need to be done need_ac = ["NO"] * (num + 1) sql_count += 1 - sql_count = execute_txn(IsPredicate, num, ops, need_ac, file_name, sql_count, data_num, txn, data_value, wait_op_list,data_op_list) - + sql_count = execute_txn(IsPredicate, num, ops, need_ac, file_name, sql_count, + data_num, txn, data_value, wait_op_list, data_op_list) + # reorder statements so that conflict-free statements execute first # sql_count = execute_first(IsPredicate, num, ops, file_name, sql_count, txn_count, data_num, txn, data_value, data_op_list) # txn_count = 2