Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move data to sql #143

Merged
merged 1 commit into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## Unreleased

- move rows payload (RowBinary, CSV, etc.) to SQL statement
- remove pseudo-positional binds, make param names explicit

## 0.2.2 (2023-12-23)

- fix query encoding for datetimes with zeroed microseconds `~U[****-**-** **:**:**.000000]` https://github.com/plausible/ch/pull/138
Expand Down
92 changes: 54 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2})
```

#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (efficient)
#### Insert [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary)

```elixir
{:ok, pid} = Ch.start_link()
Expand All @@ -98,20 +98,26 @@ types = [Ch.Types.u64()]
# or
types = [:u64]

rows = [[0], [1]]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types)
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT RowBinary\n" | row_binary])
```

Note that RowBinary format encoding requires `:types` option to be provided.

Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check.

```elixir
sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes"
opts = [names: ["id"], types: ["UInt64"]]
rows = [[0], [1]]
names = ["id"]
types = ["UInt64"]

header = Ch.RowBinary.encode_names_and_types(names, types)
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts)
%Ch.Result{num_rows: 3} =
Ch.query!(pid, ["INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n", header | row_binary])
```

#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats)
Expand All @@ -121,29 +127,42 @@ rows = [[0], [1]]

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n)
csv = "0\n1"

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false)
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
```

#### Insert rows as chunked RowBinary stream
#### Insert rows as [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end)
chunked = Stream.chunk_every(stream, 100)
encoded = Stream.map(chunked, fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
ten_encoded_chunks = Stream.take(encoded, 10)
row_binary =
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)

%Ch.Result{num_rows: 1000} =
Ch.query(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", ten_encoded_chunks, encode: false)
%Ch.Result{num_rows: 1_000_000} =
Ch.query(pid, Stream.concat(["INSERT INTO ch_demo(id) FORMAT RowBinary\n"], row_binary))
```

This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request while unfolding the stream resulting in lower memory usage.
#### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

sql = "INSERT INTO ch_demo SELECT id + {ego:Int64} FROM input('id UInt64') FORMAT RowBinary\n"
row_binary = Ch.RowBinary.encode_rows([[1], [2], [3]], ["UInt64"])

%Ch.Result{num_rows: 3} =
Ch.query!(pid, [sql | row_binary], %{"ego" => -1})
```

#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings)

Expand All @@ -156,7 +175,7 @@ settings = [async_insert: 1]
Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'")

%Ch.Result{rows: [["async_insert", "Bool", "1"]]} =
Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", [], settings: settings)
Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", _params = [], settings: settings)
```

## Caveats
Expand All @@ -179,13 +198,13 @@ CREATE TABLE ch_nulls (
""")

types = ["Nullable(UInt8)", "UInt8", "UInt8"]
inserted_rows = [[nil, nil, nil]]
selected_rows = [[nil, 0, 0]]
rows = [[nil, nil, nil]]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 1} =
Ch.query!(pid, "INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary", inserted_rows, types: types)
Ch.query!(pid, ["INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary\n" | row_binary])

%Ch.Result{rows: ^selected_rows} =
%Ch.Result{rows: [[nil, 0, 0]]} =
Ch.query!(pid, "SELECT * FROM ch_nulls")
```

Expand All @@ -197,13 +216,17 @@ However, [`input()`](https://clickhouse.com/docs/en/sql-reference/table-function
sql = """
INSERT INTO ch_nulls
SELECT * FROM input('a Nullable(UInt8), b Nullable(UInt8), c UInt8')
FORMAT RowBinary\
FORMAT RowBinary
"""

Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"])
types = ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 1} =
Ch.query!(pid, [sql | row_binary])

%Ch.Result{rows: [[0], [10]]} =
Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b")
%Ch.Result{rows: [_before = [nil, 0, 0], _after = [nil, 10, 0]]} =
Ch.query!(pid, "SELECT * FROM ch_nulls ORDER BY b")
```

#### UTF-8 in RowBinary
Expand All @@ -215,26 +238,19 @@ When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types

Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory")

bin = "\x61\xF0\x80\x80\x80b"
utf8 = "a�b"
# "\x61\xF0\x80\x80\x80b" will become "a�b" on SELECT
row_binary = Ch.RowBinary.encode(:string, "\x61\xF0\x80\x80\x80b")

%Ch.Result{num_rows: 1} =
Ch.query!(pid, "INSERT INTO ch_utf8(str) FORMAT RowBinary", [[bin]], types: ["String"])
Ch.query!(pid, ["INSERT INTO ch_utf8(str) FORMAT RowBinary\n" | row_binary])

%Ch.Result{rows: [[^utf8]]} =
%Ch.Result{rows: [["a�b"]]} =
Ch.query!(pid, "SELECT * FROM ch_utf8")

%Ch.Result{rows: %{"data" => [[^utf8]]}} =
%Ch.Result{rows: %{"data" => [["a�b"]]}} =
pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact") |> Map.update!(:rows, &Jason.decode!/1)
```

To get raw binary from `String` columns use `:binary` type that skips UTF-8 checks.

```elixir
%Ch.Result{rows: [[^bin]]} =
Ch.query!(pid, "SELECT * FROM ch_utf8", [], types: [:binary])
```

#### Timezones in RowBinary

Decoding non-UTC datetimes like `DateTime('Asia/Taipei')` requires a [timezone database.](https://hexdocs.pm/elixir/DateTime.html#module-time-zone-database)
Expand Down Expand Up @@ -268,7 +284,7 @@ utc = DateTime.utc_now()
taipei = DateTime.shift_zone!(utc, "Asia/Taipei")

# ** (ArgumentError) non-UTC timezones are not supported for encoding: 2023-04-26 01:49:43.044569+08:00 CST Asia/Taipei
Ch.query!(pid, "INSERT INTO ch_datetimes(datetime) FORMAT RowBinary", [[naive], [utc], [taipei]], types: ["DateTime"])
Ch.RowBinary.encode_rows([[naive], [utc], [taipei]], ["DateTime"])
```

## Benchmarks
Expand Down
59 changes: 44 additions & 15 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,85 @@ defmodule Ch do
@moduledoc "Minimal HTTP ClickHouse client."
alias Ch.{Connection, Query, Result}

@type common_option ::
{:database, String.t()}
| {:username, String.t()}
| {:password, String.t()}
| {:settings, Keyword.t()}
| {:timeout, timeout}

@type start_option ::
common_option
| {:scheme, String.t()}
| {:hostname, String.t()}
| {:port, :inet.port_number()}
| {:transport_opts, :gen_tcp.connect_option()}
| DBConnection.start_option()

@doc """
Start the connection process and connect to ClickHouse.

## Options

* `:scheme` - HTTP scheme, defaults to `"http"`
* `:hostname` - server hostname, defaults to `"localhost"`
* `:port` - HTTP port, defualts to `8123`
* `:scheme` - HTTP scheme, defaults to `"http"`
* `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* `:database` - Database, defaults to `"default"`
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of ClickHouse settings
* `:timeout` - HTTP receive timeout in milliseconds
* `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)

"""
@spec start_link([start_option]) :: GenServer.on_start()
def start_link(opts \\ []) do
DBConnection.start_link(Connection, opts)
end

@doc """
Returns a supervisor child specification for a DBConnection pool.

See `start_link/1` for supported options.
"""
@spec child_spec([start_option]) :: :supervisor.child_spec()
def child_spec(opts) do
DBConnection.child_spec(Connection, opts)
end

# TODO move streaming to Ch.stream/4
@type statement :: iodata | Enumerable.t()
@type params :: %{String.t() => term} | [{String.t(), term}]

@type query_option ::
common_option
| {:command, Ch.Query.command()}
| {:headers, [{String.t(), String.t()}]}
| {:format, String.t()}
| {:decode, boolean}
| DBConnection.connection_option()

@doc """
Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
`{:error, Exception.t()}` if there was a database error.

## Options

* `:timeout` - Query request timeout
* `:settings` - Keyword list of settings
* `:database` - Database
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of settings
* `:timeout` - Query request timeout
* `:command` - Command tag for the query
* `:headers` - Custom HTTP headers for the request
* `:format` - Custom response format for the request
* `:decode` - Whether to automatically decode the response
* [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)

"""
@spec query(DBConnection.conn(), iodata, params, Keyword.t()) ::
@spec query(DBConnection.conn(), statement, params, [query_option]) ::
{:ok, Result.t()} | {:error, Exception.t()}
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
def query(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)

Expand All @@ -57,26 +93,19 @@ defmodule Ch do
Runs a query and returns the result or raises `Ch.Error` if
there was an error. See `query/4`.
"""
@spec query!(DBConnection.conn(), iodata, params, Keyword.t()) :: Result.t()
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
@spec query!(DBConnection.conn(), statement, params, [query_option]) :: Result.t()
def query!(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.execute!(conn, query, params, opts)
end

@doc false
@spec stream(DBConnection.t(), iodata, map | [term], Keyword.t()) :: DBConnection.Stream.t()
@spec stream(DBConnection.t(), statement, params, [query_option]) :: DBConnection.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.stream(conn, query, params, opts)
end

@doc false
@spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
def run(conn, f, opts \\ []) when is_function(f, 1) do
DBConnection.run(conn, f, opts)
end

if Code.ensure_loaded?(Ecto.ParameterizedType) do
@behaviour Ecto.ParameterizedType

Expand Down
39 changes: 21 additions & 18 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,35 +127,24 @@ defmodule Ch.Connection do
end

@impl true
def handle_execute(%Query{command: :insert} = query, params, opts, conn) do
{query_params, extra_headers, body} = params
def handle_execute(%Query{statement: statement} = query, params, opts, conn) do
{query_params, extra_headers} = params

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

result =
if is_function(body, 2) do
request_chunked(conn, "POST", path, headers, body, opts)
if is_list(statement) or is_binary(statement) do
request(conn, "POST", path, headers, statement, opts)
else
request(conn, "POST", path, headers, body, opts)
request_chunked(conn, "POST", path, headers, statement, opts)
end

with {:ok, conn, responses} <- result do
{:ok, query, responses, conn}
end
end

def handle_execute(query, params, opts, conn) do
{query_params, extra_headers, body} = params

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do
{:ok, query, responses, conn}
end
end

@impl true
def disconnect(_error, conn) do
{:ok = ok, _conn} = HTTP.close(conn)
Expand All @@ -164,7 +153,14 @@ defmodule Ch.Connection do

@typep response :: Mint.Types.status() | Mint.Types.headers() | binary

@spec request(conn, binary, binary, Mint.Types.headers(), iodata, Keyword.t()) ::
@spec request(
conn,
method :: String.t(),
path :: String.t(),
Mint.Types.headers(),
body :: iodata,
[Ch.query_option()]
) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
Expand All @@ -174,7 +170,14 @@ defmodule Ch.Connection do
end
end

@spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) ::
@spec request_chunked(
conn,
method :: String.t(),
path :: String.t(),
Mint.Types.headers(),
body :: Enumerable.t(),
[Ch.query_option()]
) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
Expand Down
Loading