diff --git a/examples/basic_examples/transaction/src/main/example.c b/examples/basic_examples/transaction/src/main/example.c index b281eab9e..4087b46e8 100644 --- a/examples/basic_examples/transaction/src/main/example.c +++ b/examples/basic_examples/transaction/src/main/example.c @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -31,114 +32,166 @@ #include "example_utils.h" -static void -abort_txn(aerospike* as, as_txn* txn) -{ - as_error err; - as_status status = aerospike_abort(as, &err, txn, NULL); - - if (status != AEROSPIKE_OK) { - LOG("aerospike_abort() returned %d - %s", err.code, err.message); - } - example_cleanup(as); -} +static as_status run_commands(aerospike* as, as_txn* txn); int main(int argc, char* argv[]) { + // Parse command line arguments. if (! example_get_opts(argc, argv, EXAMPLE_BASIC_OPTS)) { - exit(-1); + return -1; } + // Connect to cluster. + as_error err; + as_status status; aerospike as; - example_connect_to_aerospike(&as); - example_remove_test_record(&as); // Initialize transaction. as_txn txn; as_txn_init(&txn); - LOG("Initialize transaction: %" PRId64, (int64_t)txn.id); + printf("Initialize transaction: %" PRIu64 "\n", txn.id); + + // Run commands in a transaction. The commands must use a single namespace and + // the namespace must be configured as strong-consistency. + status = run_commands(&as, &txn); + + if (status == AEROSPIKE_OK) { + printf("Commit transaction: %" PRIu64 "\n", txn.id); + as_commit_status commit_status; + status = aerospike_commit(&as, &err, &txn, &commit_status); + + if (status != AEROSPIKE_OK) { + printf("aerospike_commit() returned %d - %s\n", err.code, err.message); + // Do not call aerospike_abort() if the commit fails. + if (commit_status == AS_COMMIT_MARK_ROLL_FORWARD_ABANDONED) { + // The commit read-verify step succeeded, but the transaction monitor + // could not be marked for roll-forward. In this case, the transaction + // could be re-committed. + printf("Transaction can be re-committed\n"); + } + else { + // The commit read-verify step failed. The transaction has been + // permanently aborted. + printf("Transaction aborted\n"); + } + } + } + else { + printf("Abort transaction: %" PRIu64 "\n", txn.id); + as_status s = aerospike_abort(&as, &err, &txn, NULL); - // Perform commands in a transaction. - as_policy_write pw; - as_policy_write_copy(&as.config.policies.write, &pw); - pw.base.txn = &txn; + if (s != AEROSPIKE_OK) { + printf("aerospike_abort() returned %d - %s\n", err.code, err.message); + } + } + + // Cleanup. + as_txn_destroy(&txn); + aerospike_close(&as, &err); + aerospike_destroy(&as); + return status == AEROSPIKE_OK? 0 : -1; +} + +static bool batch_write_cb(const as_batch_result* results, uint32_t n_keys, void* udata); + +static as_status +run_commands(aerospike* as, as_txn* txn) +{ + as_error err; + as_status status; - LOG("aerospike_key_put()"); + printf("Write record\n"); + + as_policy_write pw; + as_policy_write_copy(&as->config.policies.write, &pw); + pw.base.txn = txn; as_key key; - as_key_init_int64(&key, "test", "demoset", 1); + as_key_init_int64(&key, g_namespace, g_set, 1); as_record rec; as_record_inita(&rec, 1); as_record_set_int64(&rec, "a", 1234); - as_error err; + status = aerospike_key_put(as, &err, &pw, &key, &rec); - if (aerospike_key_put(&as, &err, &pw, &key, &rec) != AEROSPIKE_OK) { - LOG("aerospike_key_put() returned %d - %s", err.code, err.message); - abort_txn(&as, &txn); - exit(-1); + if (status != AEROSPIKE_OK) { + printf("aerospike_key_put() 1 returned %d - %s\n", err.code, err.message); + return status; } + as_record_destroy(&rec); - LOG("aerospike_key_put()"); + printf("Write more records in a batch\n"); - as_key_init_int64(&key, "test", "demoset", 2); - as_record_set_int64(&rec, "b", 5678); + as_policy_batch pb; + as_policy_batch_copy(&as->config.policies.batch_parent_write, &pb); + pb.base.txn = txn; - if (aerospike_key_put(&as, &err, &pw, &key, &rec) != AEROSPIKE_OK) { - LOG("aerospike_key_put() returned %d - %s", err.code, err.message); - abort_txn(&as, &txn); - exit(-1); + as_operations ops; + as_operations_inita(&ops, 1); + as_operations_add_write_int64(&ops, "c", 9999); + + as_batch batch; + as_batch_inita(&batch, 2); + + for (uint32_t i = 0; i < 2; i++) { + as_key_init_int64(as_batch_keyat(&batch, i), g_namespace, g_set, i); } - as_record_destroy(&rec); - LOG("aerospike_key_get()"); + status = aerospike_batch_operate(as, &err, &pb, NULL, &batch, &ops, batch_write_cb, NULL); + as_operations_destroy(&ops); + + if (status != AEROSPIKE_OK) { + printf("aerospike_batch_operate() returned %d - %s\n", err.code, err.message); + return status; + } + as_batch_destroy(&batch); + + printf("Read record\n"); as_policy_read pr; - as_policy_read_copy(&as.config.policies.read, &pr); - pr.base.txn = &txn; + as_policy_read_copy(&as->config.policies.read, &pr); + pr.base.txn = txn; - as_key_init_int64(&key, "test", "demoset", 3); + as_key_init_int64(&key, g_namespace, g_set, 3); as_record* recp = NULL; - as_status status = aerospike_key_get(&as, &err, &pr, &key, &recp); + status = aerospike_key_get(as, &err, &pr, &key, &recp); if (status != AEROSPIKE_OK && status != AEROSPIKE_ERR_RECORD_NOT_FOUND) { - LOG("aerospike_key_get() returned %d - %s", err.code, err.message); - abort_txn(&as, &txn); - exit(-1); + printf("aerospike_key_get() returned %d - %s\n", err.code, err.message); + return status; } as_record_destroy(recp); - LOG("aerospike_key_remove()"); + printf("Delete record\n"); as_policy_remove prem; - as_policy_remove_copy(&as.config.policies.remove, &prem); - prem.base.txn = &txn; - prem.durable_delete = true; + as_policy_remove_copy(&as->config.policies.remove, &prem); + prem.base.txn = txn; + prem.durable_delete = true; // Required when deleting records in a transaction. - status = aerospike_key_remove(&as, &err, &prem, &key); + status = aerospike_key_remove(as, &err, &prem, &key); if (status != AEROSPIKE_OK && status != AEROSPIKE_ERR_RECORD_NOT_FOUND) { - LOG("aerospike_key_remove() returned %d - %s", err.code, err.message); - abort_txn(&as, &txn); - exit(-1); + printf("aerospike_key_remove() returned %d - %s\n", err.code, err.message); + return status; } - LOG("Commit transaction: %" PRId64, (int64_t)txn.id); + return AEROSPIKE_OK; +} + +static bool +batch_write_cb(const as_batch_result* results, uint32_t n_keys, void* udata) +{ + for (uint32_t i = 0; i < n_keys; i++) { + const as_batch_result* r = &results[i]; - if (aerospike_commit(&as, &err, &txn, NULL) != AEROSPIKE_OK) { - LOG("aerospike_commit() returned %d - %s", err.code, err.message); - // Do not call aerospike_abort() when commit fails. - // aerospike_commit() will attempt an abort on failure. - example_cleanup(&as); - exit(-1); + if (r->result != AEROSPIKE_OK) { + printf("batch row[%u] returned %d\n", i, r->result); + } } - - // Cleanup transaction. - as_txn_destroy(&txn); - example_cleanup(&as); - return 0; + return true; } diff --git a/examples/project/Makefile b/examples/project/Makefile index ee0666a5a..afa067d02 100644 --- a/examples/project/Makefile +++ b/examples/project/Makefile @@ -106,13 +106,6 @@ endif LD_FLAGS += -lssl -lcrypto -lpthread -lm -lz $(LINK_SUFFIX) -ifeq ($(OS),Linux) - LD_FLAGS += -lrt -ldl -else ifeq ($(OS),FreeBSD) - LD_FLAGS += -lrt -endif - -LD_FLAGS += -lm -lz CC = cc ###############################################################################