Skip to content

Commit

Permalink
Merge remote-tracking branch 'contrib-sargun/master' into slf-merge-s…
Browse files Browse the repository at this point in the history
…argun-master
  • Loading branch information
slfritchie committed Oct 20, 2014
2 parents 042e8ea + 4c92409 commit 0faae29
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 47 deletions.
42 changes: 42 additions & 0 deletions examples/riakc_pb_distributed.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{mode, max}.

{duration, 10}.
{report_interval,1}.

{concurrent, 50}.

{driver, basho_bench_driver_riakc_pb}.

{key_generator, {int_to_bin_bigendian, {uniform_int, 10000}}}.

{value_generator, {fixed_bin, 10000}}.

{riakc_pb_ips, [{127,0,0,1}]}.

{riakc_pb_replies, 1}.

%%% {operations, [{get, 1}]}.
{operations, [{get, 1}, {update, 1}]}.

%% Use {auto_reconnect, false} to get "old" behavior (prior to April 2013).
%% See deps/riakc/src/riakc_pb_socket.erl for all valid socket options.
{pb_connect_options, [{auto_reconnect, true}]}.

%% Overrides for the PB client's default 60 second timeout, on a
%% per-type-of-operation basis. All timeout units are specified in
%% milliseconds. The pb_timeout_general config item provides a
%% default timeout if the read/write/listkeys/mapreduce timeout is not
%% specified.

{pb_timeout_general, 30000}.
{pb_timeout_read, 5000}.
{pb_timeout_write, 5000}.
{pb_timeout_listkeys, 50000}.
%% The general timeout will be used because this specific item is commented:
%% {pb_timeout_mapreduce, 50000}.


%% Remote_nodes must be in the format of [{fqdn, nodename}]
%% basho_bench / distributed Erlang use longnames
{remote_nodes, [{'3c075477e55e-2.local', 'bb25'}]}.
{distribute_work, true}.
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

{escript_incl_apps, [lager, getopt, bear, folsom, ibrowse, riakc, riak_pb, mochiweb, protobuffs, velvet, goldrush]}.

{escript_emu_args, "%%! +K true\n"}.
{escript_emu_args, "%%! +K true -rsh ssh\n"}.
%% Use this for the Java client bench driver
%% {escript_emu_args, "%%! +K true -name [email protected] -setcookie YOUR_ERLANG_COOKIE\n"}.
{escript_emu_args, "%%! +K true -name [email protected] -setcookie YOUR_ERLANG_COOKIE\n"}.
{escript_emu_args, "%%! +K true -name [email protected] -setcookie YOUR_ERLANG_COOKIE -rsh ssh\n"}.
76 changes: 72 additions & 4 deletions src/basho_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
-module(basho_bench).

-export([main/1, md5/1]).

-include("basho_bench.hrl").

%% ====================================================================
Expand Down Expand Up @@ -53,6 +52,8 @@ main(Args) ->
{error, {already_loaded, basho_bench}} -> ok
end,
register(basho_bench, self()),
%% TODO: Move into a proper supervision tree, janky for now
{ok, _Pid} = basho_bench_config:start_link(),
basho_bench_config:set(test_id, BenchName),

application:load(lager),
Expand Down Expand Up @@ -93,15 +94,16 @@ main(Args) ->
%% Copy the config into the test dir for posterity
[ begin {ok, _} = file:copy(Config, filename:join(TestDir, filename:basename(Config))) end
|| Config <- Configs ],

case basho_bench_config:get(distribute_work, false) of
true -> setup_distributed_work();
false -> ok
end,
%% Set our CWD to the test dir
ok = file:set_cwd(TestDir),

log_dimensions(),

%% Run pre_hook for user code preconditions
run_pre_hook(),

%% Spin up the application
ok = basho_bench_app:start(),

Expand Down Expand Up @@ -139,6 +141,7 @@ maybe_net_node(Opts) ->
case lists:keyfind(net_node, 1, Opts) of
{_, Node} ->
{_, Cookie} = lists:keyfind(net_cookie, 1, Opts),
os:cmd("epmd -daemon"),
net_kernel:start([Node, longnames]),
erlang:set_cookie(Node, Cookie),
ok;
Expand Down Expand Up @@ -256,6 +259,7 @@ load_source_files(Dir) ->
case compile:file(F, [report, binary]) of
{ok, Mod, Bin} ->
{module, Mod} = code:load_binary(Mod, F, Bin),
deploy_module(Mod),
?INFO("Loaded ~p (~s)\n", [Mod, F]),
ok;
Error ->
Expand All @@ -276,6 +280,70 @@ run_hook({Module, Function}) ->
run_hook(no_op) ->
no_op.

get_addr_args() ->
{ok, IfAddrs} = inet:getifaddrs(),
FlattAttrib = lists:flatten([IfAttrib || {_Ifname, IfAttrib} <- IfAddrs]),
Addrs = proplists:get_all_values(addr, FlattAttrib),
% If inet:ntoa is unavailable, it probably means that you're running <R16
StrAddrs = [inet:ntoa(Addr) || Addr <- Addrs],
string:join(StrAddrs, " ").
setup_distributed_work() ->
case node() of
'nonode@nohost' ->
?STD_ERR("Basho bench not started in distributed mode, and distribute_work = true~n", []),
halt(1);
_ -> ok
end,
{ok, _Pid} = erl_boot_server:start([]),
%% Allow anyone to boot from me...I might want to lock this down this down at some point
erl_boot_server:add_subnet({0,0,0,0}, {0,0,0,0}),
%% This is cheating, horribly, but it's the only simple way to bypass net_adm:host_file()
gen_server:start({global, pool_master}, pool, [], []),
RemoteSpec = basho_bench_config:get(remote_nodes, []),
Cookie = lists:flatten(erlang:atom_to_list(erlang:get_cookie())),
Args = "-setcookie " ++ Cookie ++ " -loader inet -hosts " ++ get_addr_args(),
Slaves = [ slave:start_link(Host, Name, Args) || {Host, Name} <- RemoteSpec],
SlaveNames = [SlaveName || {ok, SlaveName} <- Slaves],
[pool:attach(SlaveName) || SlaveName <- SlaveNames],
CodePaths = code:get_path(),
rpc:multicall(SlaveNames, code, set_path, [CodePaths]),
Apps = [lager, basho_bench, getopt, bear, folsom, ibrowse, riakc, riak_pb, mochiweb, protobuffs, velvet, goldrush],
[distribute_app(App) || App <- Apps].


deploy_module(Module) ->
case basho_bench_config:get(distribute_work, false) of
true ->
Nodes = nodes(),
{Module, Binary, Filename} = code:get_object_code(Module),
rpc:multicall(Nodes, code, load_binary, [Module, Filename, Binary]);
false -> ok
end.

distribute_app(App) ->
% :(. This is super hackish, it depends on a bunch of assumptions
% But, unfortunately there are negative interactions with escript and slave nodes
CodeExtension = code:objfile_extension(),
LibDir = code:lib_dir(App),
% Get what paths are in the code path that start with LibDir
LibDirLen = string:len(LibDir),
EbinsDir = lists:filter(fun(CodePathDir) -> string:substr(CodePathDir, 1, LibDirLen) == LibDir end, code:get_path()),
StripEndFun = fun(Path) ->
PathLen = string:len(Path),
case string:substr(Path, PathLen - string:len(CodeExtension) + 1, string:len(Path)) of
CodeExtension ->
{true, string:substr(Path, 1, PathLen - string:len(CodeExtension))};
_ -> false
end
end,
EbinDirDistributeFun = fun(EbinDir) ->
{ok, Beams} = erl_prim_loader:list_dir(EbinDir),
Modules = lists:filtermap(StripEndFun, Beams),
ModulesLoaded = [code:load_abs(filename:join(EbinDir, ModFileName)) || ModFileName <- Modules],
lists:foreach(fun({module, Module}) -> deploy_module(Module) end, ModulesLoaded)
end,
lists:foreach(EbinDirDistributeFun, EbinsDir),
ok.
%% just a utility, should be in basho_bench_utils.erl
%% but 's' is for multiple utilities, and so far this
%% is the only one.
Expand Down
2 changes: 2 additions & 0 deletions src/basho_bench_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ stop_or_kill() ->
%%===================================================================

start(_StartType, _StartArgs) ->
%% TODO: Move into a proper supervision tree, janky for now
basho_bench_config:start_link(),
{ok, Pid} = basho_bench_sup:start_link(),
application:set_env(basho_bench_app, is_running, true),
ok = basho_bench_stats:run(),
Expand Down
125 changes: 93 additions & 32 deletions src/basho_bench_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,63 @@
%%
%% -------------------------------------------------------------------
-module(basho_bench_config).
-behaviour(gen_server).


-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-endif.

-export([load/1,
normalize_ips/2,
set/2,
get/1, get/2]).

-export([start_link/0]).

% Gen server callbacks
-export([code_change/3, init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2]).

-include("basho_bench.hrl").

-record(basho_bench_config_state, {}).

-type state() :: #basho_bench_config_state{}.
%% ===================================================================
%% Public API
%% ===================================================================

%% Todo: ensure_started before calling on any gen_server APIs.
ensure_started() ->
start_link().

start_link() ->
gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).


load(Files) ->
TermsList =
[ case file:consult(File) of
{ok, Terms} ->
Terms;
{error, Reason} ->
?FAIL_MSG("Failed to parse config file ~s: ~p\n", [File, Reason])
end || File <- Files ],
load_config(lists:append(TermsList)).
ensure_started(),
gen_server:call({global, ?MODULE}, {load_files, Files}).

set(Key, Value) ->
gen_server:call({global, ?MODULE}, {set, Key, Value}).

get(Key) ->
case gen_server:call({global, ?MODULE}, {get, Key}) of
{ok, Value} ->
Value;
undefined ->
erlang:error("Missing configuration key", [Key])
end.

get(Key, Default) ->
case gen_server:call({global, ?MODULE}, {get, Key}) of
{ok, Value} ->
Value;
undefined ->
Default
end.

%% @doc Normalize the list of IPs and Ports.
%%
Expand All @@ -58,42 +94,67 @@ normalize_ips(IPs, DefultPort) ->
end,
lists:foldl(F, [], IPs).

set(Key, Value) ->
ok = application:set_env(basho_bench, Key, Value).

get(Key) ->
case application:get_env(basho_bench, Key) of
{ok, Value} ->
Value;
undefined ->
erlang:error("Missing configuration key", [Key])
end.

get(Key, Default) ->
case application:get_env(basho_bench, Key) of
{ok, Value} ->
Value;
_ ->
Default
end.


%% ===================================================================
%% Internal functions
%% ===================================================================

load_config([]) ->
ok;
load_config([{Key, Value} | Rest]) ->
?MODULE:set(Key, Value),
load_config(Rest);
load_config([ Other | Rest]) ->
?WARN("Ignoring non-tuple config value: ~p\n", [Other]),
load_config(Rest).

normalize_ip_entry({IP, Ports}, Normalized, _) when is_list(Ports) ->
[{IP, Port} || Port <- Ports] ++ Normalized;
normalize_ip_entry({IP, Port}, Normalized, _) ->
[{IP, Port}|Normalized];
normalize_ip_entry(IP, Normalized, DefaultPort) ->
[{IP, DefaultPort}|Normalized].


%% ===
%% Gen_server Functions
%% ===

-spec init(term()) -> {ok, state()}.
init(_Args) ->
State = #basho_bench_config_state{},
{ok, State}.

-spec code_change(term(), state(), term()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

-spec terminate(term(), state()) -> 'ok'.
terminate(_Reason, _State) ->
ok.

handle_call({load_files, FileNames}, _From, State) ->
set_keys_from_files(FileNames),
{reply, ok, State};

handle_call({set, Key, Value}, _From, State) ->
application:set_env(basho_bench, Key, Value),
{reply, ok, State};
handle_call({get, Key}, _From, State) ->
Value = application:get_env(basho_bench, Key),
{reply, Value, State}.

handle_cast(_Cast, State) ->
{noreply, State}.

handle_info(_Info, State) ->
{noreply, State}.

set_keys_from_files(Files) ->
KVs = [
case file:consult(File) of
{ok, Terms} ->
Terms;
{error, Reason} ->
?FAIL_MSG("Failed to parse config file ~s: ~p\n", [File, Reason]),
throw(invalid_config),
notokay
end || File <- Files ],
FlatKVs = lists:flatten(KVs),
[application:set_env(basho_bench, Key, Value) || {Key, Value} <- FlatKVs].

Loading

0 comments on commit 0faae29

Please sign in to comment.