diff --git a/src/shotgun.erl b/src/shotgun.erl index be2c877..2c6baee 100644 --- a/src/shotgun.erl +++ b/src/shotgun.erl @@ -265,16 +265,18 @@ request(Pid, get, Uri, Headers0, Body, Options) -> async := IsAsync, async_mode := AsyncMode, headers := Headers, - timeout := Timeout} = process_options(Options, Headers0, get), + timeout := Timeout} = process_options(Options, Headers0, get, Body), + + Request = #{uri => Uri, headers => Headers, body => Body}, Event = case IsAsync of - true -> - {get_async, - {HandleEvent, AsyncMode}, - {Uri, Headers, Body}}; - false -> - {get, {Uri, Headers, Body}} + true -> { get_async + , {HandleEvent, AsyncMode} + , Request + }; + false -> {get, Request} end, + gen_fsm:sync_send_event(Pid, Event, Timeout) catch _:Reason -> {error, Reason} @@ -282,9 +284,13 @@ request(Pid, get, Uri, Headers0, Body, Options) -> request(Pid, Method, Uri, Headers0, Body, Options) -> try check_uri(Uri), - #{headers := Headers, timeout := Timeout} = - process_options(Options, Headers0, Method), - Event = {Method, {Uri, Headers, Body}}, + #{ headers := Headers + , timeout := Timeout + } = process_options(Options, Headers0, Method, Body), + + Request = #{uri => Uri, headers => Headers, body => Body}, + Event = {Method, Request}, + gen_fsm:sync_send_event(Pid, Event, Timeout) catch _:Reason -> {error, Reason} @@ -349,17 +355,18 @@ init([Host, Port, Type, Opts]) -> Timeout = maps:get(timeout, Opts, 5000), {ok, Pid} = gun:open(Host, Port, GunOpts), case gun:await_up(Pid, Timeout) of - {ok, _} -> - State = clean_state(), - {ok, at_rest, State#{pid => Pid}}; - %The only apparent timeout for gun:open is the connection timeout of the - %underlying transport. So, a timeout message here comes from gun:await_up. - {error, timeout} -> - {stop, gun_open_timeout}; - %gun currently terminates with reason normal if gun:open fails to open - %the requested connection. This bubbles up through gun:await_up. - {error, normal} -> - {stop, gun_open_failed} + {ok, _} -> + State = clean_state(), + {ok, at_rest, State#{pid => Pid}}; + %% The only apparent timeout for gun:open is the connection timeout of + %% the underlying transport. So, a timeout message here comes from + %% gun:await_up. + {error, timeout} -> + {stop, gun_open_timeout}; + %% gun currently terminates with reason normal if gun:open fails to + %% open the requested connection. This bubbles up through gun:await_up. + {error, normal} -> + {stop, gun_open_failed} end. %% @private @@ -422,18 +429,23 @@ at_rest(Event, From, State) -> %% @private -spec wait_response(term(), pid(), term()) -> term(). -wait_response( {data, Data, FinNoFin} - , _From +wait_response({data, Data, FinNoFin} + , From , #{stream := StreamRef, pid := Pid} = State) -> ok = gun:data(Pid, StreamRef, FinNoFin, Data), - {reply, ok, wait_response, State}; + + NewState = State#{from => From}, + case FinNoFin of + fin -> {next_state, wait_response, NewState}; + nofin -> {reply, ok, wait_response, NewState} + end; wait_response(Event, From, State) -> enqueue_work_or_stop(wait_response, Event, From, State). %% @private -spec receive_data(term(), pid(), term()) -> term(). receive_data({data, _, _} = Event, _From, State) -> - unexpected_event_warning(at_rest, Event), + unexpected_event_warning(receive_data, Event), {reply, {error, unexpected}, receive_data, State}; receive_data(Event, From, State) -> enqueue_work_or_stop(receive_data, Event, From, State). @@ -441,91 +453,109 @@ receive_data(Event, From, State) -> %% @private -spec receive_chunk(term(), pid(), term()) -> term(). receive_chunk({data, _, _} = Event, _From, State) -> - unexpected_event_warning(at_rest, Event), + unexpected_event_warning(receive_chunk, Event), {reply, {error, unexpected}, receive_chunk, State}; receive_chunk(Event, From, State) -> enqueue_work_or_stop(receive_chunk, Event, From, State). -%See if we have work. If we do, dispatch. -%If we don't, stay in at_rest. +%% @doc See if we have work. If we do, dispatch. +%% If we don't, stay in at_rest. %% @private -spec at_rest(any(), state()) -> {next_state, atom(), state()}. at_rest(timeout, State) -> - case get_work(State) of - no_work -> - {next_state, at_rest, State}; - {ok, Work, NewState} -> - ok = gen_fsm:send_event(self(), Work), - {next_state, at_rest, NewState} - end; -at_rest({get_async, {HandleEvent, AsyncMode}, Args, From}, + NewState = case get_work(State) of + no_work -> + State; + {ok, Work, State1} -> + ok = gen_fsm:send_event(self(), Work), + State1 + end, + + {next_state, at_rest, NewState}; +at_rest({get_async, {HandleEvent, AsyncMode}, Request, From}, State = #{pid := Pid}) -> - StreamRef = do_http_verb(get, Pid, Args), + StreamRef = do_http_verb(get, Pid, Request), CleanState = clean_state(State), - NewState = CleanState#{ - from => From, - pid => Pid, - stream => StreamRef, - handle_event => HandleEvent, - async => true, - async_mode => AsyncMode - }, + NewState = CleanState#{ from => From + , pid => Pid + , stream => StreamRef + , handle_event => HandleEvent + , async => true + , async_mode => AsyncMode + , request => Request + }, + {next_state, wait_response, NewState}; -at_rest({HttpVerb, {_, _, Body} = Args, From}, State = #{pid := Pid}) -> - StreamRef = do_http_verb(HttpVerb, Pid, Args), +at_rest({HttpVerb, Request, From}, State = #{pid := Pid}) -> + StreamRef = do_http_verb(HttpVerb, Pid, Request), CleanState = clean_state(State), - NewState = CleanState#{ pid => Pid - , stream => StreamRef - , from => From + NewState = CleanState#{ pid => Pid + , stream => StreamRef + , from => From + , request => Request }, - case Body of - body_chunked -> - gen_fsm:send_event(self(), body_chunked); - _ -> ok - end, + {next_state, wait_response, NewState}. %% @private -spec wait_response(term(), term()) -> term(). wait_response({'DOWN', _, _, _, Reason}, _State) -> exit(Reason); +wait_response({gun_response, _Pid, StreamRef, nofin, StatusCode, Headers}, + #{ from := From + , request := #{body := ReqBody} + , stream := StreamRef + , async := Async} = State) -> + io:format("gun_response nofin ~p~n", [StatusCode]), + + Response = #{ status_code => StatusCode + , headers => Headers + , body => undefined + }, + Result = {ok, Response}, + + TransferEncoding = lists:keyfind(<<"transfer-encoding">>, 1, Headers), + NextStateName = + case {TransferEncoding, ReqBody} of + {{_, <<"chunked">>}, _} when Async == true -> + gen_fsm:reply(From, Result), + receive_chunk; + {_, body_chunked} -> + gen_fsm:reply(From, Result), + wait_response; + _ -> + receive_data + end, + io:format("gun_response nofin next_state ~p~n", [NextStateName]), + + { next_state + , NextStateName + , State#{status_code := StatusCode, headers := Headers} + }; wait_response({gun_response, _Pid, _StreamRef, fin, StatusCode, Headers}, #{from := From, async := Async, responses := Responses} = State) -> - Response = #{status_code => StatusCode, headers => Headers}, + io:format("gun_response fin - aync? ~p~n", [Async]), + Response = #{ status_code => StatusCode + , headers => Headers + , body => undefined}, NewResponses = case Async of false -> + io:format("gun_response fin - replying to ~p~n", [From]), gen_fsm:reply(From, {ok, Response}), Responses; true -> - gen_fsm:reply(From, {ok, Response}) + queue:in(Response, Responses) end, + {next_state, at_rest, State#{responses => NewResponses}, 0}; -wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers}, - #{from := From, stream := StreamRef, async := Async} = State) -> - StateName = - case lists:keyfind(<<"transfer-encoding">>, 1, Headers) of - {<<"transfer-encoding">>, <<"chunked">>} when Async == true-> - Result = {ok, StreamRef}, - gen_fsm:reply(From, Result), - receive_chunk; - _ -> - receive_data - end, - { next_state - , StateName - , State#{status_code := StatusCode, headers := Headers} - }; wait_response({gun_error, _Pid, _StreamRef, Error}, #{from := From} = State) -> + io:format("gun_error~n"), gen_fsm:reply(From, {error, Error}), {next_state, at_rest, State, 0}; -wait_response(body_chunked, - #{stream := StreamRef, from := From} = State) -> - gen_fsm:reply(From, {ok, StreamRef}), - {next_state, wait_response, State}; wait_response(Event, State) -> {stop, {unexpected, Event}, State}. @@ -536,11 +566,13 @@ receive_data({'DOWN', _, _, _, _Reason}, _State) -> error(incomplete); receive_data({gun_data, _Pid, StreamRef, nofin, Data}, #{stream := StreamRef, data := DataAcc} = State) -> + io:format("gun_data nofin ~p~n", [Data]), NewData = <>, {next_state, receive_data, State#{data => NewData}}; receive_data({gun_data, _Pid, _StreamRef, fin, Data}, #{data := DataAcc, from := From, status_code := StatusCode, headers := Headers} = State) -> + io:format("gun_data fin ~p~n", [Data]), NewData = <>, Result = {ok, #{status_code => StatusCode, headers => Headers, @@ -597,9 +629,16 @@ clean_state(Reqs) -> %% @private -spec do_http_verb(http_verb(), pid(), tuple()) -> reference(). -do_http_verb(Method, Pid, {Uri, Headers, body_chunked}) -> +do_http_verb(Method, Pid, #{body := body_chunked} = Request) -> + #{ uri := Uri + , headers := Headers + } = Request, gun:request(Pid, http_verb_bin(Method), Uri, Headers); -do_http_verb(Method, Pid, {Uri, Headers, Body}) -> +do_http_verb(Method, Pid, Request) -> + #{ uri := Uri + , headers := Headers + , body := Body + } = Request, gun:request(Pid, http_verb_bin(Method), Uri, Headers, Body). -spec http_verb_bin(atom()) -> binary(). @@ -639,8 +678,9 @@ manage_chunk(IsFin, StreamRef, Data, NewState. %% @private -process_options(Options, HeadersMap, HttpVerb) -> - Headers = basic_auth_header(HeadersMap), +process_options(Options, Headers0, HttpVerb, Body) -> + Headers1 = basic_auth_header(Headers0), + Headers = expect_header(Headers1, Body), HandleEvent = maps:get(handle_event, Options, undefined), Async = maps:get(async, Options, false), AsyncMode = maps:get(async_mode, Options, binary), @@ -650,6 +690,7 @@ process_options(Options, HeadersMap, HttpVerb) -> {true, Other} -> throw({async_unsupported, Other}); _ -> ok end, + #{handle_event => HandleEvent, async => Async, async_mode => AsyncMode, @@ -676,6 +717,12 @@ encode_basic_auth([], []) -> encode_basic_auth(Username, Password) -> base64:encode(Username ++ [$: | Password]). +%% @private +expect_header(Headers, body_chunked) -> + [{<<"Expect">>, <<"100-continue">>} | Headers]; +expect_header(Headers, _) -> + Headers. + %% @private sse_events(IsFin, Data, State = #{buffer := Buffer}) -> NewBuffer = <>,