Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Mint timeout #241

Merged
merged 1 commit into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ defmodule Ch do
| {:username, String.t()}
| {:password, String.t()}
| {:settings, Keyword.t()}
| {:timeout, timeout}

@type start_option ::
common_option
| {:scheme, String.t()}
| {:hostname, String.t()}
| {:port, :inet.port_number()}
| {:transport_opts, [:gen_tcp.connect_option() | :ssl.client_option()]}
| {:transport_opts, [:gen_tcp.connect_option() | :ssl.tls_client_option()]}
| {:timeout, timeout}
| DBConnection.start_option()

@doc """
Expand All @@ -30,8 +30,8 @@ defmodule Ch do
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of ClickHouse settings
* `:timeout` - HTTP receive timeout in milliseconds
* `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* `:timeout` - Connection handshake and ping timeout in milliseconds
* [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)

"""
Expand Down Expand Up @@ -72,7 +72,6 @@ defmodule Ch do
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of settings
* `:timeout` - Query request timeout
* `:command` - Command tag for the query
* `:headers` - Custom HTTP headers for the request
* `:format` - Custom response format for the request
Expand Down
40 changes: 22 additions & 18 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@ defmodule Ch.Connection do
def connect(opts) do
with {:ok, conn} <- do_connect(opts) do
handshake = Query.build("select 1")
params = DBConnection.Query.encode(handshake, _params = [], _opts = [])

case handle_execute(handshake, params, _opts = [], conn) do
{:ok, handshake, responses, conn} ->
{query_params, extra_headers, body} =
DBConnection.Query.encode(handshake, _params = [], _opts = [])

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)
timeout = HTTP.get_private(conn, :timeout) || :timer.seconds(15)

case request(conn, "POST", path, headers, body, timeout) do
{:ok, conn, responses} ->
case DBConnection.Query.decode(handshake, responses, _opts = []) do
%Result{rows: [[1]]} ->
{:ok, conn}
Expand All @@ -44,8 +50,9 @@ defmodule Ch.Connection do
def ping(conn) do
conn = maybe_reconnect(conn)
headers = [{"user-agent", @user_agent}]
timeout = HTTP.get_private(conn, :timeout) || :timer.seconds(5)

case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do
case request(conn, "GET", "/ping", headers, _body = [], timeout) do
{:ok, conn, _response} -> {:ok, conn}
{:error, error, conn} -> {:disconnect, error, conn}
{:disconnect, _error, _conn} = disconnect -> disconnect
Expand Down Expand Up @@ -88,7 +95,7 @@ defmodule Ch.Connection do
headers = headers(conn, extra_headers, opts)

with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body),
{:ok, conn} <- eat_ok_status_and_headers(conn, timeout(conn, opts)) do
{:ok, conn} <- eat_ok_status_and_headers(conn, :infinity) do
{:ok, query, %Result{command: command}, conn}
end
end
Expand Down Expand Up @@ -148,8 +155,8 @@ defmodule Ch.Connection do
end
end

def handle_fetch(_query, result, opts, conn) do
case HTTP.recv(conn, 0, timeout(conn, opts)) do
def handle_fetch(_query, result, _opts, conn) do
case HTTP.recv(conn, 0, :infinity) do
{:ok, conn, responses} ->
{halt_or_cont(responses), %Result{result | data: extract_data(responses)}, conn}

Expand Down Expand Up @@ -194,12 +201,12 @@ defmodule Ch.Connection do
end
end

def handle_execute(%Query{} = query, {:stream, ref, body}, opts, conn) do
def handle_execute(%Query{} = query, {:stream, ref, body}, _opts, conn) do
case HTTP.stream_request_body(conn, ref, body) do
{:ok, conn} ->
case body do
:eof ->
with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do
with {:ok, conn, responses} <- receive_full_response(conn, :infinity) do
{:ok, query, responses, conn}
end

Expand All @@ -219,7 +226,7 @@ defmodule Ch.Connection do
path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do
with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, :infinity) do
{:ok, query, responses, conn}
end
end
Expand All @@ -232,13 +239,13 @@ defmodule Ch.Connection do

@typep response :: Mint.Types.status() | Mint.Types.headers() | binary

@spec request(conn, binary, binary, Mint.Types.headers(), iodata, [Ch.query_option()]) ::
@spec request(conn, binary, binary, Mint.Types.headers(), iodata, timeout) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
defp request(conn, method, path, headers, body, opts) do
defp request(conn, method, path, headers, body, timeout) do
with {:ok, conn, _ref} <- send_request(conn, method, path, headers, body) do
receive_full_response(conn, timeout(conn, opts))
receive_full_response(conn, timeout)
end
end

Expand Down Expand Up @@ -275,7 +282,7 @@ defmodule Ch.Connection do
end
end

@spec recv_all(conn, [response], timeout()) ::
@spec recv_all(conn, [response], timeout) ::
{:ok, conn, [response]} | {:disconnect, Mint.Types.error(), conn}
defp recv_all(conn, acc, timeout) do
case HTTP.recv(conn, 0, timeout) do
Expand All @@ -302,9 +309,6 @@ defmodule Ch.Connection do
defp maybe_put_private(conn, _k, nil), do: conn
defp maybe_put_private(conn, k, v), do: HTTP.put_private(conn, k, v)

defp timeout(conn), do: HTTP.get_private(conn, :timeout)
defp timeout(conn, opts), do: Keyword.get(opts, :timeout) || timeout(conn)

defp settings(conn, opts) do
default_settings = HTTP.get_private(conn, :settings, [])
opts_settings = Keyword.get(opts, :settings, [])
Expand Down Expand Up @@ -375,7 +379,7 @@ defmodule Ch.Connection do
with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do
conn =
conn
|> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
|> maybe_put_private(:timeout, opts[:timeout])
|> maybe_put_private(:database, opts[:database])
|> maybe_put_private(:username, opts[:username])
|> maybe_put_private(:password, opts[:password])
Expand Down
12 changes: 7 additions & 5 deletions test/ch/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1247,13 +1247,15 @@ defmodule Ch.ConnectionTest do
end

describe "options" do
# this test is flaky, sometimes it raises due to ownership timeout
@tag capture_log: true, skip: true
test "can provide custom timeout", %{conn: conn} do
assert {:error, %Mint.TransportError{reason: :timeout} = error} =
Ch.query(conn, "select sleep(1)", _params = [], timeout: 100)
log =
ExUnit.CaptureLog.capture_log([async: true], fn ->
assert {:error, %Mint.TransportError{reason: :closed}} =
Ch.query(conn, "select sleep(1)", _params = [], timeout: 100)
end)

assert Exception.message(error) == "timeout"
assert log =~
"timed out because it queued and checked out the connection for longer than 100ms"
end

test "errors on invalid creds", %{conn: conn} do
Expand Down
15 changes: 10 additions & 5 deletions test/ch/faults_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ defmodule Ch.FaultsTest do

log =
capture_async_log(fn ->
{:ok, conn} = Ch.start_link(port: port, timeout: 100)
{:ok, conn} = Ch.start_link(port: port)

# connect
{:ok, mint} = :gen_tcp.accept(listen)
Expand All @@ -264,8 +264,8 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))

spawn_link(fn ->
assert {:error, %Mint.TransportError{reason: :timeout}} =
Ch.query(conn, "select 1 + 1")
assert {:error, %Mint.TransportError{reason: :closed}} =
Ch.query(conn, "select 1 + 1", [], timeout: 100)
end)

# failed select 1 + 1
Expand All @@ -280,7 +280,9 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))

spawn_link(fn ->
assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1")
assert {:ok, %{num_rows: 1, rows: [[2]]}} =
Ch.query(conn, "select 1 + 1", [], timeout: 100)

send(test, :done)
end)

Expand All @@ -291,7 +293,10 @@ defmodule Ch.FaultsTest do
assert_receive :done
end)

assert log =~ "disconnected: ** (Mint.TransportError) timeout"
assert log =~ "disconnected: ** (DBConnection.ConnectionError)"

assert log =~
"timed out because it queued and checked out the connection for longer than 100ms"
end

test "reconnects after closed on response", ctx do
Expand Down