diff --git a/src/include/aerospike/as_metrics.h b/src/include/aerospike/as_metrics.h index 33fa08cf2..7c9df3f00 100644 --- a/src/include/aerospike/as_metrics.h +++ b/src/include/aerospike/as_metrics.h @@ -78,19 +78,19 @@ typedef struct as_policy_metrics_s { struct as_metrics_listeners_s* metrics_listeners; - FILE* file; + void* udata; } as_policy_metrics; struct as_cluster_s; struct as_node_s; -typedef void (*as_metrics_enable_callback)(struct as_policy_metrics_s* policy); +typedef void (*as_metrics_enable_callback)(const struct as_policy_metrics_s* policy); -typedef void (*as_metrics_snapshot_callback)(const struct as_policy_metrics_s* policy, const struct as_cluster_s* cluster); +typedef void (*as_metrics_snapshot_callback)(const struct as_cluster_s* cluster, void* udata); -typedef void (*as_metrics_node_close_callback)(const struct as_policy_metrics_s* policy, const struct as_node_s* node); +typedef void (*as_metrics_node_close_callback)(const struct as_node_s* node, void* udata); -typedef void (*as_metrics_disable_callback)(struct as_policy_metrics_s* policy, const struct as_cluster_s* cluster); +typedef void (*as_metrics_disable_callback)(const struct as_cluster_s* cluster, void* udata); typedef struct as_metrics_listeners_s { as_metrics_enable_callback enable_callback; @@ -103,6 +103,22 @@ typedef struct as_node_metrics_s { as_latency_buckets* latency; } as_node_metrics; +typedef struct as_metrics_writer_s { + FILE* file; + + as_string_builder* sb; + + bool enable; + + uint64_t max_size; + + uint64_t size; + + int32_t latency_columns; + + int32_t latency_shift; +} as_metrics_writer; + const char* utc_time_str(time_t t); @@ -137,13 +153,16 @@ void as_metrics_process_cpu_load_mem_usage(double* cpu_usage, double* mem); void -as_metrics_write_cluster(struct as_policy_metrics_s* policy, const struct as_cluster_s* cluster); +as_metrics_write_cluster(as_metrics_writer* mw, const struct as_cluster_s* cluster); + +void +as_metrics_write_node(as_metrics_writer* mw, struct as_node_stats_s* node_stats); void -as_metrics_write_node(as_string_builder* sb, struct as_node_stats_s* node_stats); +as_metrics_write_conn(as_metrics_writer* mw, struct as_conn_stats_s* conn_stats); void -as_metrics_write_conn(as_string_builder* sb, struct as_conn_stats_s* conn_stats); +as_metrics_write_line(as_metrics_writer* mw); #if defined(__linux__) void diff --git a/src/include/aerospike/as_node.h b/src/include/aerospike/as_node.h index 827b77fb2..52661d68d 100644 --- a/src/include/aerospike/as_node.h +++ b/src/include/aerospike/as_node.h @@ -298,6 +298,9 @@ typedef struct as_node_s { */ as_racks* racks; + /** + * Node metrics + */ as_node_metrics* metrics; /** diff --git a/src/main/aerospike/as_cluster.c b/src/main/aerospike/as_cluster.c index f540db9df..45ec18ec4 100644 --- a/src/main/aerospike/as_cluster.c +++ b/src/main/aerospike/as_cluster.c @@ -559,7 +559,7 @@ as_cluster_enable_metrics(as_cluster* cluster, as_policy_metrics* policy) { if (cluster->metrics_enabled) { - cluster->metrics_listeners->disable_callback(policy, cluster); + cluster->metrics_listeners->disable_callback(policy, cluster, policy->udata); } cluster->metrics_listeners = policy->metrics_listeners; @@ -576,7 +576,7 @@ as_cluster_enable_metrics(as_cluster* cluster, as_policy_metrics* policy) as_node_enable_metrics(node, policy); } - cluster->metrics_listeners->enable_callback(policy); + cluster->metrics_listeners->enable_callback(policy, policy->udata); } void @@ -585,7 +585,7 @@ as_cluster_disable_metrics(as_cluster* cluster) if (cluster->metrics_enabled) { cluster->metrics_enabled = false; - cluster->metrics_listeners->disable_callback(cluster->metrics_policy, cluster); + cluster->metrics_listeners->disable_callback(cluster->metrics_policy, cluster, cluster->metrics_policy->udata); } } @@ -968,7 +968,7 @@ as_cluster_tend(as_cluster* cluster, as_error* err, bool is_init) if (cluster->metrics_enabled && (cluster->tend_count % cluster->metrics_policy->interval)) { - cluster->metrics_listeners->snapshot_callback(cluster->metrics_policy, cluster); + cluster->metrics_listeners->snapshot_callback(cluster->metrics_policy, cluster, cluster->metrics_policy->udata); } as_cluster_destroy_peers(&peers); diff --git a/src/main/aerospike/as_metrics.c b/src/main/aerospike/as_metrics.c index eaa187935..2eb48f7fb 100644 --- a/src/main/aerospike/as_metrics.c +++ b/src/main/aerospike/as_metrics.c @@ -45,7 +45,6 @@ as_metrics_policy_init(as_policy_metrics* policy) policy->interval = 30; policy->latency_columns = 7; policy->latency_shift = 1; - policy->file = NULL; } char* @@ -143,7 +142,7 @@ as_metrics_add_latency(as_node_metrics* node_metrics, as_latency_type latency_ty } void -as_metrics_enable(struct as_policy_metrics_s* policy) +as_metrics_writer_enable(const struct as_policy_metrics_s* policy) { if (policy->report_size_limit != 0 && policy->report_size_limit < MIN_FILE_SIZE) { @@ -151,73 +150,79 @@ as_metrics_enable(struct as_policy_metrics_s* policy) } // create file directory - policy->file = fopen(policy->report_directory, "w"); - - as_string_builder sb; - as_string_builder_inita(&sb, 25, true); - as_string_builder_append(&sb, utc_time_str(time(NULL))); - as_string_builder_append(&sb, " header(1)"); - as_string_builder_append(&sb, " cluster[name,cpu,mem,invalidNodeCount,tranCount,retryCount,delayQueueTimeoutCount,eventloop[],node[]]"); - as_string_builder_append(&sb, " eventloop[processSize,queueSize]"); - as_string_builder_append(&sb, " node[name,address,port,syncConn,asyncConn,errors,timeouts,latency[]]"); - as_string_builder_append(&sb, " conn[inUse,inPool,opened,closed]"); - as_string_builder_append(&sb, " latency("); - as_string_builder_append(&sb, policy->latency_columns); - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, policy->latency_shift); - as_string_builder_append(&sb, ')'); - as_string_builder_append(&sb, "[type[l1,l2,l3...]]"); - fprintf(policy->file, "\n"); - fprintf(policy->file, sb.data); + as_metrics_writer* mw = policy->udata; + mw->file = fopen(policy->report_directory, "w"); + mw->max_size = policy->report_size_limit; + mw->latency_columns = policy->latency_columns; + mw->latency_shift = policy->latency_shift; + mw->size = 0; + + as_string_builder_inita(mw->sb, 25, true); + as_string_builder_append(&mw->sb, utc_time_str(time(NULL))); + as_string_builder_append(&mw->sb, " header(1)"); + as_string_builder_append(&mw->sb, " cluster[name,cpu,mem,invalidNodeCount,tranCount,retryCount,delayQueueTimeoutCount,eventloop[],node[]]"); + as_string_builder_append(&mw->sb, " eventloop[processSize,queueSize]"); + as_string_builder_append(&mw->sb, " node[name,address,port,syncConn,asyncConn,errors,timeouts,latency[]]"); + as_string_builder_append(&mw->sb, " conn[inUse,inPool,opened,closed]"); + as_string_builder_append(&mw->sb, " latency("); + as_string_builder_append(&mw->sb, mw->latency_columns); + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, mw->latency_shift); + as_string_builder_append(&mw->sb, ')'); + as_string_builder_append(&mw->sb, "[type[l1,l2,l3...]]"); + as_metrics_write_line(mw); + + mw->enable = true; } void -as_metrics_snapshot(const struct as_policy_metrics_s* policy, const struct as_cluster_s* cluster) +as_metrics_writer_snapshot(const struct as_cluster_s* cluster, void* udata) { - if (policy->file != NULL) + as_metrics_writer* mw = udata; + if (mw->enable && mw->file != NULL) { - as_metrics_write_cluster(policy, cluster); + as_metrics_write_cluster(mw, cluster); } } void -as_metrics_node_close(const struct as_policy_metrics_s* policy, const struct as_node_s* node) +as_metrics_writer_node_close(const struct as_node_s* node, void* udata) { // write node info to file - if (policy->file != NULL) + as_metrics_writer* mw = udata; + if (mw->enable && mw->file != NULL) { - as_string_builder sb; - as_string_builder_inita(&sb, 25, true); - as_string_builder_append(&sb, utc_time_str(time(NULL))); - as_metrics_write_node(&sb, node); - fprintf(policy->file, "\n"); - fprintf(policy->file, sb.data); + as_string_builder_append(&mw->sb, utc_time_str(time(NULL))); + as_metrics_write_node(&mw->sb, node); + as_metrics_write_line(mw); } } void -as_metrics_disable(struct as_policy_metrics_s* policy, const struct as_cluster_s* cluster) +as_metrics_writer_disable(const struct as_cluster_s* cluster, void* udata) { // write cluster into to file, disable - if (policy->file != NULL) + as_metrics_writer* mw = udata; + if (mw->enable && mw->file != NULL) { - as_metrics_write_cluster(policy, cluster); - fclose(policy->file); - policy->file = NULL; + as_metrics_write_cluster(mw, cluster); + fclose(mw->file); + mw->file = NULL; + mw->enable = false; } } void as_metrics_listeners_init(as_metrics_listeners* listeners) { - listeners->enable_callback = as_metrics_enable; - listeners->disable_callback = as_metrics_disable; - listeners->node_close_callback = as_metrics_node_close; - listeners->snapshot_callback = as_metrics_snapshot; + listeners->enable_callback = as_metrics_writer_enable; + listeners->disable_callback = as_metrics_writer_disable; + listeners->node_close_callback = as_metrics_writer_node_close; + listeners->snapshot_callback = as_metrics_writer_snapshot; } void -as_metrics_write_cluster(struct as_policy_metrics_s* policy, const struct as_cluster_s* cluster) { +as_metrics_write_cluster(as_metrics_writer* mw, const struct as_cluster_s* cluster) { char* cluster_name = cluster->cluster_name; if (cluster_name == NULL) { @@ -230,78 +235,75 @@ as_metrics_write_cluster(struct as_policy_metrics_s* policy, const struct as_clu as_cluster_stats* stats; aerospike_cluster_stats(cluster, stats); - as_string_builder sb; - as_string_builder_inita(&sb, 10, true); - as_string_builder_append(&sb, utc_time_str(time(NULL))); - as_string_builder_append(&sb, " cluster["); - as_string_builder_append(&sb, cluster_name); - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, (int)cpu_load); - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, mem); - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, cluster->invalid_node_count); // Cumulative. Not reset on each interval. - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, as_cluster_get_tran_count(cluster)); // Cumulative. Not reset on each interval. - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, cluster->retry_count); // Cumulative. Not reset on each interval. - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, cluster->delay_queue_timeout_count); // Cumulative. Not reset on each interval. - as_string_builder_append(&sb, ",["); + as_string_builder_append(&mw->sb, utc_time_str(time(NULL))); + as_string_builder_append(&mw->sb, " cluster["); + as_string_builder_append(&mw->sb, cluster_name); + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, (int)cpu_load); + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, mem); + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, cluster->invalid_node_count); // Cumulative. Not reset on each interval. + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, as_cluster_get_tran_count(cluster)); // Cumulative. Not reset on each interval. + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, cluster->retry_count); // Cumulative. Not reset on each interval. + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, cluster->delay_queue_timeout_count); // Cumulative. Not reset on each interval. + as_string_builder_append(&mw->sb, ",["); as_event_loop_stats* event_loops = stats->event_loops; for (uint32_t i = 0; i < stats->event_loops_size; i++) { as_event_loop_stats* loop = &event_loops[i]; if (i > 0) { - as_string_builder_append(&sb, ','); + as_string_builder_append(&mw->sb, ','); } - as_string_builder_append(&sb, '['); - as_string_builder_append(&sb, loop->process_size); - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, loop->queue_size); - as_string_builder_append(&sb, ']'); + as_string_builder_append(&mw->sb, '['); + as_string_builder_append(&mw->sb, loop->process_size); + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, loop->queue_size); + as_string_builder_append(&mw->sb, ']'); } - as_string_builder_append(&sb, '],['); - + as_string_builder_append(&mw->sb, '],['); + as_node_stats* nodes = stats->nodes; for (uint32_t i = 0; i < stats->nodes_size; i++) { as_node_stats* node = &stats->nodes[i]; if (i > 0) { - as_string_builder_append(&sb, ","); + as_string_builder_append(&mw->sb, ","); } - as_metrics_write_node(&sb, node); + as_metrics_write_node(&mw->sb, node); } - as_string_builder_append(&sb, "]]"); + as_string_builder_append(&mw->sb, "]]"); - fprintf(policy->file, "\n"); - fprintf(policy->file, sb.data); + as_metrics_write_line(mw); } void -as_metrics_write_node(as_string_builder* sb, struct as_node_stats_s* node_stats) +as_metrics_write_node(as_metrics_writer* mw, struct as_node_stats_s* node_stats) { as_node* node = node_stats->node; - as_string_builder_append(&sb, '['); - as_string_builder_append(&sb, node->name); - as_string_builder_append(&sb, ','); + as_string_builder_append(&mw->sb, '['); + as_string_builder_append(&mw->sb, node->name); + as_string_builder_append(&mw->sb, ','); //as_host* host = node-> TODO: how to get host from node? it is in node_info - //as_string_builder_append(&sb, host->name); - //as_string_builder_append(&sb, ','); - //as_string_builder_append(&sb, host->port); - //as_string_builder_append(&sb, ','); + //as_string_builder_append(&mw->sb, host->name); + //as_string_builder_append(&mw->sb, ','); + //as_string_builder_append(&mw->sb, host->port); + //as_string_builder_append(&mw->sb, ','); - as_metrics_write_conn(sb, &node_stats->sync); - as_string_builder_append(&sb, ','); - as_metrics_write_conn(sb, &node_stats->async); - as_string_builder_append(&sb, ','); + as_metrics_write_conn(&mw->sb, &node_stats->sync); + as_string_builder_append(&mw->sb, ','); + as_metrics_write_conn(&mw->sb, &node_stats->async); + as_string_builder_append(&mw->sb, ','); - as_string_builder_append(&sb, node->error_count); - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, node->timeout_count); - as_string_builder_append(&sb, ',['); + as_string_builder_append(&mw->sb, node->error_count); + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, node->timeout_count); + as_string_builder_append(&mw->sb, ',['); as_node_metrics* node_metrics = node->metrics; uint32_t max = AS_LATENCY_TYPE_NONE; @@ -309,35 +311,48 @@ as_metrics_write_node(as_string_builder* sb, struct as_node_stats_s* node_stats) for (uint32_t i = 0; i < max; i++) { if (i > 0) { - as_string_builder_append(&sb, ","); + as_string_builder_append(&mw->sb, ","); } - as_string_builder_append(&sb, as_latency_type_to_string(i)); - as_string_builder_append(&sb, '['); + as_string_builder_append(&mw->sb, as_latency_type_to_string(i)); + as_string_builder_append(&mw->sb, '['); as_latency_buckets* buckets = &node_metrics->latency[i]; uint32_t bucket_max = buckets->latency_columns; for (uint32_t j = 0; j < bucket_max; j++) { if (j > 0) { - as_string_builder_append(&sb, ','); + as_string_builder_append(&mw->sb, ','); } - as_string_builder_append(&sb, as_metrics_get_bucket(&buckets, i)); + as_string_builder_append(&mw->sb, as_metrics_get_bucket(&buckets, i)); } - as_string_builder_append(&sb, ']'); + as_string_builder_append(&mw->sb, ']'); } - as_string_builder_append(&sb, ']]'); + as_string_builder_append(&mw->sb, ']]'); } void -as_metrics_write_conn(as_string_builder* sb, struct as_conn_stats_s* conn_stats) +as_metrics_write_conn(as_metrics_writer* mw, struct as_conn_stats_s* conn_stats) { - as_string_builder_append(&sb, conn_stats->in_use); - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, conn_stats->in_pool); - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, conn_stats->opened); // Cumulative. Not reset on each interval. - as_string_builder_append(&sb, ','); - as_string_builder_append(&sb, conn_stats->closed); // Cumulative. Not reset on each interval. + as_string_builder_append(&mw->sb, conn_stats->in_use); + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, conn_stats->in_pool); + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, conn_stats->opened); // Cumulative. Not reset on each interval. + as_string_builder_append(&mw->sb, ','); + as_string_builder_append(&mw->sb, conn_stats->closed); // Cumulative. Not reset on each interval. +} + +void +as_metrics_write_line(as_metrics_writer* mw) +{ + as_string_builder_append_newline(&mw->sb); + fprintf(mw->file, &mw->sb->data); + mw->size += mw->sb->length; + + if (mw->max_size > 0 && mw->size >= mw->max_size) + { + // write new file? + } } #if defined(__linux__)