Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new raw api #240

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

- move rows for INSERT from `params` to `statement` for more control (compression, format, query params, etc.)

## 0.3.0 (2025-02-03)

- gracefully handle `connection: closed` response from server https://github.com/plausible/ch/pull/211
Expand Down
101 changes: 55 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,54 @@ Minimal HTTP [ClickHouse](https://clickhouse.com) client for Elixir.

Used in [Ecto ClickHouse adapter.](https://github.com/plausible/ecto_ch)

### Key features

- RowBinary
- Native query parameters
- Per query settings
- Minimal API

Your ideas are welcome [here.](https://github.com/plausible/ch/issues/82)

## Installation

```elixir
defp deps do
[
{:ch, "~> 0.3.0"}
{:ch, "~> 0.4.0"}
]
end
```

## Usage

#### Start ClickHouse

```sh
# don't forget to stop the container once done
docker run --rm -p 8123:8123 -e CLICKHOUSE_PASSWORD=secret --ulimit nofile=262144:262144 clickhouse/clickhouse-server:latest-alpine
```

#### Start [DBConnection](https://github.com/elixir-ecto/db_connection) pool

```elixir
defaults = [
scheme: "http",
hostname: "localhost",
port: 8123,
database: "default",
database: "default",
settings: [],
pool_size: 1,
timeout: :timer.seconds(15)
]

# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have
# network access by default in the official Docker images
# see https://github.com/ClickHouse/ClickHouse/pull/75259
{:ok, pid} = Ch.start_link(defaults)
custom = [
# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have
# network access by default in the official Docker images
# see https://github.com/ClickHouse/ClickHouse/pull/75259
username: "default",
# this password was provided via `CLICKHOUSE_PASSWORD` to the container above
password: "secret",
]

config = Keyword.merge(defaults, custom)
{:ok, pid} = Ch.start_link(config)
```

#### Select rows

```elixir
{:ok, pid} = Ch.start_link()

{:ok, %Ch.Result{rows: [[0], [1], [2]]}} =
Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3")

Expand All @@ -70,70 +73,76 @@ Note on datetime encoding in query parameters:
#### Insert rows

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)")

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({$0:UInt8}), ({$1:UInt32})", [0, 1])

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1})

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2})
```

#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (efficient)
#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) (efficient)

```elixir
{:ok, pid} = Ch.start_link()
Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, name String) ENGINE Null")

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
rows = [
[0, "zero"],
[1, "one"],
[2, "two"]
]

types = ["UInt64"]
# or
types = [Ch.Types.u64()]
# or
types = [:u64]
types = ["UInt64", "String"]
# or types = [:u64, :string]
# or types = [Ch.Types.u64(), Ch.Types.string()]

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types)
```
sql = [
# note the \n -- ClickHouse uses it to separate SQL from the data
"INSERT INTO ch_demo(id, name) FORMAT RowBinary\n" | Ch.RowBinary.encode_rows(rows, types)
]

Note that RowBinary format encoding requires `:types` option to be provided.
%Ch.Result{num_rows: 2} = Ch.query!(pid, sql)
```

Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check.
Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats/RowBinaryWithNamesAndTypes) which would additionally do something like a type check.

```elixir
sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes"
opts = [names: ["id"], types: ["UInt64"]]
rows = [[0], [1]]
names = ["id", "name"]
types = ["UInt64", "String"]

rows = [
[0, "zero"],
[1, "one"],
[2, "two"]
]

sql = [
"INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n",
Ch.RowBinary.encode_names_and_types(names, types),
| Ch.RowBinary.encode_rows(rows, types)
]

%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts)
%Ch.Result{num_rows: 2} = Ch.query!(pid, sql)
```

#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats)

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n)

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false)
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
```

#### Insert rows as chunked RowBinary stream

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end)
Expand All @@ -150,8 +159,6 @@ This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/
#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings)

```elixir
{:ok, pid} = Ch.start_link()

settings = [async_insert: 1]

%Ch.Result{rows: [["async_insert", "Bool", "0"]]} =
Expand Down Expand Up @@ -208,6 +215,8 @@ Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)",
Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b")
```

Or [`RowBinaryWithDefaults`](https://clickhouse.com/docs/en/interfaces/formats/RowBinaryWithDefaults). TODO.

#### UTF-8 in RowBinary

When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types/string) columns non UTF-8 characters are replaced with `�` (U+FFFD). This behaviour is similar to [`toValidUTF8`](https://clickhouse.com/docs/en/sql-reference/functions/string-functions#tovalidutf8) and [JSON format.](https://clickhouse.com/docs/en/interfaces/formats#json)
Expand Down
Empty file added guides/buffer.md
Empty file.
Empty file added guides/compression.md
Empty file.
Empty file added guides/format.md
Empty file.
15 changes: 7 additions & 8 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Ch do
| {:scheme, String.t()}
| {:hostname, String.t()}
| {:port, :inet.port_number()}
| {:transport_opts, :gen_tcp.connect_option()}
| {:transport_opts, [:gen_tcp.connect_option() | :ssl.client_option()]}
| DBConnection.start_option()

@doc """
Expand Down Expand Up @@ -56,11 +56,12 @@ defmodule Ch do
| {:headers, [{String.t(), String.t()}]}
| {:format, String.t()}
| {:types, [String.t() | atom | tuple]}
# TODO remove
| {:encode, boolean}
| {:decode, boolean}
| DBConnection.connection_option()

@type query_param_key :: String.t() | atom
@type query_params :: %{query_param_key => term} | [term]

@doc """
Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
`{:error, Exception.t()}` if there was a database error.
Expand All @@ -79,9 +80,8 @@ defmodule Ch do
* [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)

"""
@spec query(DBConnection.conn(), iodata, params, [query_option]) ::
@spec query(DBConnection.conn(), iodata, Ch.query_params(), [query_option]) ::
{:ok, Result.t()} | {:error, Exception.t()}
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
def query(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)

Expand All @@ -94,15 +94,14 @@ defmodule Ch do
Runs a query and returns the result or raises `Ch.Error` if
there was an error. See `query/4`.
"""
@spec query!(DBConnection.conn(), iodata, params, [query_option]) :: Result.t()
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
@spec query!(DBConnection.conn(), iodata, Ch.query_params(), [query_option]) :: Result.t()
def query!(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.execute!(conn, query, params, opts)
end

@doc false
@spec stream(DBConnection.t(), iodata, map | [term], [query_option]) :: Ch.Stream.t()
@spec stream(DBConnection.t(), iodata, Ch.query_params(), [query_option]) :: Ch.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
%Ch.Stream{conn: conn, query: query, params: params, opts: opts}
Expand Down
46 changes: 0 additions & 46 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,25 +212,6 @@ defmodule Ch.Connection do
end
end

def handle_execute(%Query{command: :insert} = query, params, opts, conn) do
conn = maybe_reconnect(conn)
{query_params, extra_headers, body} = params

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

result =
if is_function(body, 2) do
request_chunked(conn, "POST", path, headers, body, opts)
else
request(conn, "POST", path, headers, body, opts)
end

with {:ok, conn, responses} <- result do
{:ok, query, responses, conn}
end
end

def handle_execute(query, params, opts, conn) do
conn = maybe_reconnect(conn)
{query_params, extra_headers, body} = params
Expand Down Expand Up @@ -261,33 +242,6 @@ defmodule Ch.Connection do
end
end

@spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
def request_chunked(conn, method, path, headers, stream, opts) do
with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream),
{:ok, conn} <- stream_body(conn, ref, stream),
do: receive_full_response(conn, timeout(conn, opts))
end

@spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) ::
{:ok, conn} | {:disconnect, Mint.Types.error(), conn}
defp stream_body(conn, ref, stream) do
result =
stream
|> Stream.concat([:eof])
|> Enum.reduce_while({:ok, conn}, fn
chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)}
_chunk, {:error, _conn, _reason} = error -> {:halt, error}
end)

case result do
{:ok, _conn} = ok -> ok
{:error, conn, reason} -> {:disconnect, reason, conn}
end
end

# stacktrace is a bit cleaner with this function inlined
@compile inline: [send_request: 5]
defp send_request(conn, method, path, headers, body) do
Expand Down
Loading