[#118] Handle unexpected data messages
jfacorro committed Nov 18, 2015
1 parent 5cd0e5f commit c3c08ad
Showing 1 changed file with 53 additions and 37 deletions.
90 changes: 53 additions & 37 deletions src/shotgun.erl
Expand Up @@ -82,7 +82,7 @@
-type http_verb() :: get | post | head | delete | patch | put | options.
-type uri() :: iodata().
-type headers() :: #{}.
-type body() :: iodata().
-type body() :: iodata() | body_chunked.
-type options() ::
#{ async => boolean()
, async_mode => binary | sse
Expand Down Expand Up @@ -193,37 +193,37 @@ get(Pid, Uri, Headers) ->
%% </li>
%% </ul>
%% @end
-spec get(pid(), iodata(), headers(), options()) -> result().
-spec get(connection(), uri(), headers(), options()) -> result().
get(Pid, Uri, Headers, Options) ->
request(Pid, get, Uri, Headers, [], Options).

%% @doc Performs a <strong>POST</strong> request to <code>Uri</code> using
%% <code>Headers</code> and <code>Body</code> as the content data.
-spec post(pid(), iodata(), headers(), iodata(), options()) -> result().
-spec post(connection(), uri(), headers(), body(), options()) -> result().
post(Pid, Uri, Headers, Body, Options) ->
request(Pid, post, Uri, Headers, Body, Options).

%% @doc Performs a <strong>DELETE</strong> request to <code>Uri</code> using
%% <code>Headers</code>.
-spec delete(pid(), iodata(), headers(), options()) -> result().
-spec delete(connection(), uri(), headers(), options()) -> result().
delete(Pid, Uri, Headers, Options) ->
request(Pid, delete, Uri, Headers, [], Options).

%% @doc Performs a <strong>HEAD</strong> request to <code>Uri</code> using
%% <code>Headers</code>.
-spec head(pid(), iodata(), headers(), options()) -> result().
-spec head(connection(), uri(), headers(), options()) -> result().
head(Pid, Uri, Headers, Options) ->
request(Pid, head, Uri, Headers, [], Options).

%% @doc Performs a <strong>OPTIONS</strong> request to <code>Uri</code> using
%% <code>Headers</code>.
-spec options(pid(), iodata(), headers(), options()) -> result().
-spec options(connection(), uri(), headers(), options()) -> result().
options(Pid, Uri, Headers, Options) ->
request(Pid, options, Uri, Headers, [], Options).

%% @doc Performs a <strong>PATCH</strong> request to <code>Uri</code> using
%% <code>Headers</code> and <code>Body</code> as the content data.
-spec patch(pid(), iodata(), headers(), iodata(), options()) -> result().
-spec patch(connection(), uri(), headers(), body(), options()) -> result().
patch(Pid, Uri, Headers, Body, Options) ->
request(Pid, patch, Uri, Headers, Body, Options).

Expand All @@ -242,7 +242,7 @@ put(Pid, Uri, Headers0, Body, Options) ->
%% @doc Performs a request to <code>Uri</code> using the HTTP method
%% specified by <code>Method</code>, <code>Body</code> as the content data and
%% <code>Headers</code> as the request's headers.
-spec request(pid(), http_verb(), iodata(), headers(), iodata(), options()) ->
-spec request(connection(), http_verb(), uri(), headers(), body(), options()) ->
request(Pid, get, Uri, Headers0, Body, Options) ->
Expand Down Expand Up @@ -277,13 +277,13 @@ request(Pid, Method, Uri, Headers0, Body, Options) ->

%% @doc Send data when the
-spec data(connection(), body()) -> ok.
-spec data(connection(), body()) -> ok | {error, any()}.
data(Conn, Data) ->
data(Conn, Data, nofin).

-spec data(connection(), body(), fin | nofin) -> ok.
-spec data(connection(), body(), fin | nofin) -> ok | {error, any()}.
data(Conn, Data, FinNoFin) ->
gen_fsm:send_event(Conn, {data, Data, FinNoFin}).
gen_fsm:sync_send_event(Conn, {data, Data, FinNoFin}).

%% @doc Returns a list of all received events up to now.
-spec events(Pid :: pid()) -> [{nofin | fin, reference(), binary()}].
Expand Down Expand Up @@ -398,12 +398,45 @@ terminate(_Reason, _StateName, #{pid := Pid} = _State) ->
%% gen_fsm states

%% @private
-spec at_rest(term(), pid(), term()) -> term().
at_rest({data, _, _} = Event, _From, State) ->
unexpected_event_warning(at_rest, Event),
{reply, {error, unexpected}, at_rest, State};
at_rest(Event, From, State) ->
enqueue_work_or_stop(at_rest, Event, From, State).

%% @private
-spec wait_response(term(), pid(), term()) -> term().
wait_response( {data, Data, FinNoFin}
, _From
, #{stream := StreamRef, pid := Pid} = State) ->
ok = gun:data(Pid, StreamRef, FinNoFin, Data),
{reply, ok, wait_response, State};
wait_response(Event, From, State) ->
enqueue_work_or_stop(wait_response, Event, From, State).

%% @private
-spec receive_data(term(), pid(), term()) -> term().
receive_data({data, _, _} = Event, _From, State) ->
unexpected_event_warning(at_rest, Event),
{reply, {error, unexpected}, receive_data, State};
receive_data(Event, From, State) ->
enqueue_work_or_stop(receive_data, Event, From, State).

%% @private
-spec receive_chunk(term(), pid(), term()) -> term().
receive_chunk({data, _, _} = Event, _From, State) ->
unexpected_event_warning(at_rest, Event),
{reply, {error, unexpected}, receive_chunk, State};
receive_chunk(Event, From, State) ->
enqueue_work_or_stop(receive_chunk, Event, From, State).

%See if we have work. If we do, dispatch.
%If we don't, stay in at_rest.
%% @private
-spec at_rest(any(), state()) -> {next_state, atom(), state()}.
at_rest(timeout, State) ->
io:format("at_rest timeout~n"),
case get_work(State) of
no_work ->
{next_state, at_rest, State};
Expand All @@ -424,8 +457,7 @@ at_rest({get_async, {HandleEvent, AsyncMode}, Args, From},
async_mode => AsyncMode
{next_state, wait_response, NewState};
at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) ->
{_, _, Body} = Args,
at_rest({HttpVerb, {_, _, Body} = Args, From}, State = #{pid := Pid}) ->
StreamRef = do_http_verb(HttpVerb, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{ pid => Pid
Expand All @@ -439,15 +471,6 @@ at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) ->
{next_state, wait_response, NewState}.

-spec at_rest(term(), pid(), term()) -> term().
at_rest(Event, From, State) ->
enqueue_work_or_stop(at_rest, Event, From, State).

%% @private
-spec wait_response(term(), pid(), term()) -> term().
wait_response(Event, From, State) ->
enqueue_work_or_stop(wait_response, Event, From, State).

%% @private
-spec wait_response(term(), term()) -> term().
wait_response({'DOWN', _, _, _, Reason}, _State) ->
Expand Down Expand Up @@ -489,18 +512,9 @@ wait_response(body_chunked,
#{stream := StreamRef, from := From} = State) ->
gen_fsm:reply(From, {ok, StreamRef}),
{next_state, wait_response, State};
wait_response({data, Data, FinNoFin},
#{stream := StreamRef, pid := Pid} = State) ->
ok = gun:data(Pid, StreamRef, FinNoFin, Data),
{next_state, wait_response, State};
wait_response(Event, State) ->
{stop, {unexpected, Event}, State}.

%% @private
-spec receive_data(term(), pid(), term()) -> term().
receive_data(Event, From, State) ->
enqueue_work_or_stop(receive_data, Event, From, State).

%% @private
%% @doc Regular response
-spec receive_data(term(), term()) -> term().
Expand All @@ -524,11 +538,6 @@ receive_data({gun_error, _Pid, StreamRef, _Reason},
#{stream := StreamRef} = State) ->
{next_state, at_rest, State, 0}.

%% @private
-spec receive_chunk(term(), pid(), term()) -> term().
receive_chunk(Event, From, State) ->
enqueue_work_or_stop(receive_chunk, Event, From, State).

%% @private
%% @doc Chunked data response
-spec receive_chunk(term(), term()) -> term().
Expand Down Expand Up @@ -725,3 +734,10 @@ append_work(Work, State) ->
%% @private
get_pending_reqs(State) ->
maps:get(pending_requests, State).

%% @private
-spec unexpected_event_warning(atom(), any()) -> ok.
unexpected_event_warning(StateName, Event) ->
error_logger:warning_msg( "Unexpected event in state '~p': ~p~n"
, [StateName, Event]

