Skip to content

Commit a8f8bc0

Browse files
committed
Fixing bugs
1 parent fc7f322 commit a8f8bc0

File tree

8 files changed

+402
-412
lines changed

8 files changed

+402
-412
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ AEROSPIKE += as_key.o
136136
AEROSPIKE += as_list_operations.o
137137
AEROSPIKE += as_lookup.o
138138
AEROSPIKE += as_map_operations.o
139+
AEROSPIKE += as_metrics.o
139140
AEROSPIKE += as_node.o
140141
AEROSPIKE += as_operations.o
141142
AEROSPIKE += as_partition.o

src/include/aerospike/as_cluster.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,13 @@ typedef struct as_cluster_s {
383383

384384
bool metrics_enabled;
385385

386-
as_policy_metrics* metrics_policy;
386+
uint32_t metrics_interval;
387387

388-
as_metrics_listeners* metrics_listeners;
388+
uint32_t metrics_latency_columns;
389+
390+
uint32_t metrics_latency_shift;
391+
392+
as_metrics_listeners metrics_listeners;
389393

390394
uint64_t retry_count;
391395

src/include/aerospike/as_metrics.h

Lines changed: 49 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -58,56 +58,54 @@ typedef uint8_t as_latency_type;
5858
* Latency bucket counts are cumulative and not reset on each metrics snapshot interval
5959
*/
6060
typedef struct as_latency_buckets_s {
61-
int32_t latency_shift;
62-
int32_t latency_columns;
61+
uint32_t latency_shift;
62+
uint32_t latency_columns;
6363
uint64_t* buckets;
6464
} as_latency_buckets;
6565

66-
struct as_metrics_listeners_s;
67-
68-
/**
69-
* Metrics Policy
70-
*/
71-
typedef struct as_policy_metrics_s {
72-
const char* report_directory; // where the metrics file is output
73-
74-
int64_t report_size_limit; // default 0
75-
76-
int32_t interval; // default 30
77-
78-
int32_t latency_columns; // default 7
79-
80-
int32_t latency_shift; // default 1
81-
82-
struct as_metrics_listeners_s* metrics_listeners;
83-
84-
void* udata;
85-
} as_policy_metrics;
86-
87-
struct as_cluster_s;
66+
struct as_policy_metrics_s;
8867
struct as_node_s;
68+
struct as_cluster_s;
8969

9070
/**
9171
* Callbacks for metrics listener operations
9272
*/
93-
typedef as_status (*as_metrics_enable_callback)(as_error* err, const struct as_policy_metrics_s* policy);
73+
typedef as_status(*as_metrics_enable_listener)(as_error* err, const struct as_policy_metrics_s* policy, void* udata);
9474

95-
typedef as_status (*as_metrics_snapshot_callback)(as_error* err, struct as_cluster_s* cluster, void* udata);
75+
typedef as_status(*as_metrics_snapshot_listener)(as_error* err, struct as_cluster_s* cluster, void* udata);
9676

97-
typedef as_status (*as_metrics_node_close_callback)(as_error* err, struct as_node_s* node, void* udata);
77+
typedef as_status(*as_metrics_node_close_listener)(as_error* err, struct as_node_s* node, void* udata);
9878

99-
typedef as_status (*as_metrics_disable_callback)(as_error* err, struct as_cluster_s* cluster, void* udata);
79+
typedef as_status(*as_metrics_disable_listener)(as_error* err, struct as_cluster_s* cluster, void* udata);
10080

10181
/**
10282
* Struct to hold required callbacks
10383
*/
10484
typedef struct as_metrics_listeners_s {
105-
as_metrics_enable_callback enable_callback;
106-
as_metrics_snapshot_callback snapshot_callback;
107-
as_metrics_node_close_callback node_close_callback;
108-
as_metrics_disable_callback disable_callback;
85+
as_metrics_enable_listener enable_listener;
86+
as_metrics_snapshot_listener snapshot_listener;
87+
as_metrics_node_close_listener node_close_listener;
88+
as_metrics_disable_listener disable_listener;
89+
void* udata;
10990
} as_metrics_listeners;
11091

92+
/**
93+
* Metrics Policy
94+
*/
95+
typedef struct as_policy_metrics_s {
96+
const char* report_directory; // where the metrics file is output
97+
98+
uint64_t report_size_limit; // default 0
99+
100+
uint32_t interval; // default 30
101+
102+
uint32_t latency_columns; // default 7
103+
104+
uint32_t latency_shift; // default 1
105+
106+
as_metrics_listeners metrics_listeners;
107+
} as_policy_metrics;
108+
111109
/**
112110
* Node metrics latency bucket struct
113111
*/
@@ -129,25 +127,32 @@ typedef struct as_metrics_writer_s {
129127

130128
uint64_t size;
131129

132-
int32_t latency_columns;
130+
uint32_t latency_columns;
133131

134-
int32_t latency_shift;
132+
uint32_t latency_shift;
135133

136134
const char* report_directory;
137135
} as_metrics_writer;
138136

139-
/**
140-
* Format time into UTC string
141-
*/
142-
const char*
143-
utc_time_str(time_t t);
144-
145137
/**
146138
* Initalize metrics policy
147139
*/
148-
void
140+
AS_EXTERN void
149141
as_metrics_policy_init(as_policy_metrics* policy);
150142

143+
static inline void
144+
as_metrics_set_listeners(
145+
as_policy_metrics* policy, as_metrics_enable_listener enable,
146+
as_metrics_disable_listener disable, as_metrics_node_close_listener node_close,
147+
as_metrics_snapshot_listener snapshot
148+
)
149+
{
150+
policy->metrics_listeners.enable_listener = enable;
151+
policy->metrics_listeners.disable_listener = disable;
152+
policy->metrics_listeners.node_close_listener = node_close;
153+
policy->metrics_listeners.snapshot_listener = snapshot;
154+
}
155+
151156
/**
152157
* Convert latency_type to string version for printing to the output file
153158
*/
@@ -158,7 +163,7 @@ as_latency_type_to_string(as_latency_type type);
158163
* Initalize latency bucket struct
159164
*/
160165
void
161-
as_metrics_latency_buckets_init(as_latency_buckets* latency_buckets, int32_t latency_columns, int32_t latency_shift);
166+
as_metrics_latency_buckets_init(as_latency_buckets* latency_buckets, uint32_t latency_columns, uint32_t latency_shift);
162167

163168
/**
164169
* Return cumulative count of a bucket.
@@ -181,72 +186,21 @@ as_metrics_get_index(as_latency_buckets* latency_buckets, uint64_t elapsed_nanos
181186
/**
182187
* Initalize node metrics struct
183188
*/
184-
void
185-
as_node_metrics_init(as_node_metrics* node_metrics, const as_policy_metrics* policy);
189+
as_node_metrics*
190+
as_node_metrics_init(uint32_t latency_columns, uint32_t latency_shift);
186191

187192
/**
188193
* Add latency to corresponding bucket type
189194
*/
190195
void
191196
as_metrics_add_latency(as_node_metrics* node_metrics, as_latency_type latency_type, uint64_t elapsed);
192197

193-
/**
194-
* Initalize metrics listener struct
195-
*/
196-
void
197-
as_metrics_listeners_init(as_metrics_listeners* listeners);
198-
199-
/**
200-
* Open output metrics file and write header
201-
*/
202-
as_status
203-
as_metrics_open_writer(as_metrics_writer* mw, as_error* err);
204-
205198
/**
206199
* Calculate CPU and memory usage
207200
*/
208201
void
209202
as_metrics_process_cpu_load_mem_usage(uint32_t* cpu_usage, uint32_t* mem);
210203

211-
struct as_cluster_s;
212-
/**
213-
* Write cluster information to the metrics output file
214-
*/
215-
as_status
216-
as_metrics_write_cluster(as_error* err, as_metrics_writer* mw, struct as_cluster_s* cluster);
217-
218-
struct as_node_s;
219-
/**
220-
* Write node information to the metrics output file
221-
*/
222-
void
223-
as_metrics_write_node(as_metrics_writer* mw, struct as_node_s* node_stats);
224-
225-
struct as_conn_stats_s;
226-
/**
227-
* Write connection information to the metrics output file
228-
*/
229-
void
230-
as_metrics_write_conn(as_metrics_writer* mw, const struct as_conn_stats_s* conn_stats);
231-
232-
/**
233-
* Calculate sync conn stats data
234-
*/
235-
void
236-
as_metrics_get_node_sync_conn_stats(const struct as_node_s* node, struct as_conn_stats_s* async);
237-
238-
/**
239-
* Calculate async conn stats data
240-
*/
241-
void
242-
as_metrics_get_node_async_conn_stats(const struct as_node_s* node, struct as_conn_stats_s* sync);
243-
244-
/**
245-
* Write line to the metrics output file
246-
*/
247-
as_status
248-
as_metrics_write_line(as_metrics_writer* mw, as_error* err);
249-
250204
#if defined(__linux__)
251205
/**
252206
* Gets memory and CPU usage information from proc/stat

src/main/aerospike/aerospike_stats.c

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,7 @@ aerospike_stats_to_string(as_cluster_stats* stats)
183183
as_status
184184
aerospike_enable_metrics(aerospike* as, as_error* err, struct as_policy_metrics_s* policy)
185185
{
186-
as_cluster* cluster = as->cluster;
187-
as_status status = as_cluster_enable_metrics(err, cluster, policy);
188-
if (status != AEROSPIKE_OK)
189-
{
190-
return status;
191-
}
192-
193-
return AEROSPIKE_OK;
186+
return as_cluster_enable_metrics(err, as->cluster, policy);
194187
}
195188

196189
as_status

src/main/aerospike/as_cluster.c

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -557,31 +557,28 @@ as_cluster_remove_nodes_copy(as_cluster* cluster, as_vector* /* <as_node*> */ no
557557
as_status
558558
as_cluster_enable_metrics(as_error* err, as_cluster* cluster, as_policy_metrics* policy)
559559
{
560-
if (cluster->metrics_enabled)
561-
{
562-
cluster->metrics_listeners->disable_callback(err, cluster, policy->udata);
560+
if (cluster->metrics_enabled) {
561+
cluster->metrics_listeners.disable_listener(err, cluster, cluster->metrics_listeners.udata);
563562
}
564563

565564
cluster->metrics_listeners = policy->metrics_listeners;
566-
if (cluster->metrics_listeners == NULL)
567-
{
568-
as_metrics_listeners_init(cluster->metrics_listeners);
569-
}
570-
571-
cluster->metrics_policy = policy;
565+
cluster->metrics_interval = policy->interval;
566+
cluster->metrics_latency_columns = policy->latency_columns;
567+
cluster->metrics_latency_shift = policy->latency_shift;
572568

573-
as_nodes* nodes = cluster->nodes;
569+
as_nodes* nodes = as_nodes_reserve(cluster);
574570
for (uint32_t i = 0; i < nodes->size; i++) {
575571
as_node* node = nodes->array[i];
576572
as_node_enable_metrics(node, policy);
577573
}
578574

579-
as_status status = cluster->metrics_listeners->enable_callback(err, policy);
580-
if (status != AEROSPIKE_OK)
581-
{
575+
as_nodes_release(nodes);
576+
577+
as_status status = cluster->metrics_listeners.enable_listener(err, policy, cluster->metrics_listeners.udata);
578+
if (status != AEROSPIKE_OK) {
582579
return status;
583580
}
584-
581+
cluster->metrics_enabled = true;
585582
return AEROSPIKE_OK;
586583
}
587584

@@ -591,11 +588,7 @@ as_cluster_disable_metrics(as_error* err, as_cluster* cluster)
591588
if (cluster->metrics_enabled)
592589
{
593590
cluster->metrics_enabled = false;
594-
as_status status = cluster->metrics_listeners->disable_callback(err, cluster, cluster->metrics_policy->udata);
595-
if (status != AEROSPIKE_OK)
596-
{
597-
return status;
598-
}
591+
return cluster->metrics_listeners.disable_listener(err, cluster, cluster->metrics_listeners.udata);
599592
}
600593

601594
return AEROSPIKE_OK;
@@ -659,7 +652,7 @@ as_cluster_remove_nodes(as_error* err, as_cluster* cluster, as_vector* /* <as_no
659652
as_node_deactivate(node);
660653

661654
if (cluster->metrics_enabled) {
662-
as_status status = cluster->metrics_listeners->node_close_callback(err, node, node->cluster->metrics_policy->udata);
655+
as_status status = cluster->metrics_listeners.node_close_listener(err, node, node->cluster->metrics_listeners.udata);
663656
if (status != AEROSPIKE_OK)
664657
{
665658
return status;
@@ -988,9 +981,9 @@ as_cluster_tend(as_cluster* cluster, as_error* err, bool is_init)
988981
as_incr_uint32(&cluster->shm_info->cluster_shm->rebalance_gen);
989982
}
990983

991-
if (cluster->metrics_enabled && (cluster->tend_count % cluster->metrics_policy->interval))
984+
if (cluster->metrics_enabled && (cluster->tend_count % cluster->metrics_interval))
992985
{
993-
as_status status = cluster->metrics_listeners->snapshot_callback(err, cluster, cluster->metrics_policy->udata);
986+
as_status status = cluster->metrics_listeners.snapshot_listener(err, cluster, cluster->metrics_listeners.udata);
994987
if (status != AEROSPIKE_OK)
995988
{
996989
return status;
@@ -1510,6 +1503,15 @@ as_cluster_create(as_config* config, as_error* err, as_cluster** cluster_out)
15101503
}
15111504
pthread_attr_destroy(&attr);
15121505
}
1506+
1507+
// Initialize metrics fields
1508+
cluster->metrics_enabled = false;
1509+
cluster->metrics_interval = 0;
1510+
cluster->metrics_latency_columns = 0;
1511+
cluster->metrics_latency_shift = 0;
1512+
cluster->tran_count = 0;
1513+
cluster->retry_count = 0;
1514+
cluster->delay_queue_timeout_count = 0;
15131515
*cluster_out = cluster;
15141516
return AEROSPIKE_OK;
15151517
}

0 commit comments

Comments
 (0)