41
41
report_interval ,
42
42
errors_since_last_report = false ,
43
43
summary_file ,
44
- errors_file }).
44
+ errors_file ,
45
+ last_warn = {0 ,0 ,0 }}).
45
46
47
+ -define (WARN_INTERVAL , 1000 ). % Warn once a second
46
48
% % ====================================================================
47
49
% % API
48
50
% % ====================================================================
@@ -60,9 +62,14 @@ op_complete(Op, ok, ElapsedUs) ->
60
62
op_complete (Op , {ok , 1 }, ElapsedUs );
61
63
op_complete (Op , {ok , Units }, ElapsedUs ) ->
62
64
% % Update the histogram and units counter for the op in question
63
- % folsom_metrics:notify({latencies, Op}, ElapsedUs),
64
- % folsom_metrics:notify({units, Op}, {inc, Units}),
65
- gen_server :cast ({global , ? MODULE }, {Op , {ok , Units }, ElapsedUs }),
65
+ % io:format("Get distributed: ~p~n", [get_distributed()]),
66
+ case get_distributed () of
67
+ true ->
68
+ gen_server :cast ({global , ? MODULE }, {Op , {ok , Units }, ElapsedUs });
69
+ false ->
70
+ folsom_metrics :notify ({latencies , Op }, ElapsedUs ),
71
+ folsom_metrics :notify ({units , Op }, {inc , Units })
72
+ end ,
66
73
ok ;
67
74
op_complete (Op , Result , ElapsedUs ) ->
68
75
gen_server :call ({global , ? MODULE }, {op , Op , Result , ElapsedUs }).
@@ -141,10 +148,21 @@ handle_call({op, Op, {error, Reason}, _ElapsedUs}, _From, State) ->
141
148
increment_error_counter ({Op , Reason }),
142
149
{reply , ok , State # state { errors_since_last_report = true }}.
143
150
144
- handle_cast ({Op , {ok , Units }, ElapsedUs }, State ) ->
151
+ handle_cast ({Op , {ok , Units }, ElapsedUs }, State = # state {last_write_time = LWT , report_interval = RI }) ->
152
+ TimeSinceLastReport = timer :now_diff (os :timestamp (), LWT ) / 1000 , % % To get the diff in seconds
153
+ TimeSinceLastWarn = timer :now_diff (os :timestamp (), State # state .last_warn ) / 1000 ,
154
+ if
155
+ TimeSinceLastReport > (RI * 2 ) andalso TimeSinceLastWarn > ? WARN_INTERVAL ->
156
+ ? WARN (" basho_bench_stats has not reported in ~.2f milliseconds" , [TimeSinceLastReport ]),
157
+ {message_queue_len , QLen } = process_info (self (), message_queue_len ),
158
+ ? WARN (" stats process mailbox size = ~w " , [QLen ]),
159
+ NewState = State # state {last_warn = os :timestamp ()};
160
+ true ->
161
+ NewState = State
162
+ end ,
145
163
folsom_metrics :notify ({latencies , Op }, ElapsedUs ),
146
164
folsom_metrics :notify ({units , Op }, {inc , Units }),
147
- {noreply , State };
165
+ {noreply , NewState };
148
166
handle_cast (_ , State ) ->
149
167
{noreply , State }.
150
168
@@ -173,6 +191,21 @@ code_change(_OldVsn, State, _Extra) ->
173
191
% % Internal functions
174
192
% % ====================================================================
175
193
194
+ % % Uses the process dictionary to memoize checks
195
+ % % for checking if we're running in distributed mode
196
+ % % as constantly checking in with a centralized gen_server
197
+ % % would impede progress
198
+
199
+ get_distributed () ->
200
+ case erlang :get (distribute_work ) of
201
+ undefined ->
202
+ DistributeWork = basho_bench_config :get (distribute_work , false ),
203
+ erlang :put (distribute_work , DistributeWork ),
204
+ DistributeWork ;
205
+ DistributeWork ->
206
+ DistributeWork
207
+ end .
208
+
176
209
op_csv_file ({Label , _Op }) ->
177
210
Fname = normalize_label (Label ) ++ " _latencies.csv" ,
178
211
{ok , F } = file :open (Fname , [raw , binary , write ]),
0 commit comments