Skip to content

Commit

Permalink
Merge pull request #240 from plausible/raw-api
Browse files Browse the repository at this point in the history
new raw api
  • Loading branch information
ruslandoga authored Feb 7, 2025
2 parents 0a8ca00 + 790de76 commit 1275855
Show file tree
Hide file tree
Showing 16 changed files with 414 additions and 381 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

- move rows for INSERT from `params` to `statement` for more control (compression, format, query params, etc.)

## 0.3.0 (2025-02-03)

- gracefully handle `connection: closed` response from server https://github.com/plausible/ch/pull/211
Expand Down
101 changes: 55 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,54 @@ Minimal HTTP [ClickHouse](https://clickhouse.com) client for Elixir.

Used in [Ecto ClickHouse adapter.](https://github.com/plausible/ecto_ch)

### Key features

- RowBinary
- Native query parameters
- Per query settings
- Minimal API

Your ideas are welcome [here.](https://github.com/plausible/ch/issues/82)

## Installation

```elixir
defp deps do
[
{:ch, "~> 0.3.0"}
{:ch, "~> 0.4.0"}
]
end
```

## Usage

#### Start ClickHouse

```sh
# don't forget to stop the container once done
docker run --rm -p 8123:8123 -e CLICKHOUSE_PASSWORD=secret --ulimit nofile=262144:262144 clickhouse/clickhouse-server:latest-alpine
```

#### Start [DBConnection](https://github.com/elixir-ecto/db_connection) pool

```elixir
defaults = [
scheme: "http",
hostname: "localhost",
port: 8123,
database: "default",
database: "default",
settings: [],
pool_size: 1,
timeout: :timer.seconds(15)
]

# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have
# network access by default in the official Docker images
# see https://github.com/ClickHouse/ClickHouse/pull/75259
{:ok, pid} = Ch.start_link(defaults)
custom = [
# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have
# network access by default in the official Docker images
# see https://github.com/ClickHouse/ClickHouse/pull/75259
username: "default",
# this password was provided via `CLICKHOUSE_PASSWORD` to the container above
password: "secret",
]

config = Keyword.merge(defaults, custom)
{:ok, pid} = Ch.start_link(config)
```

#### Select rows

```elixir
{:ok, pid} = Ch.start_link()

{:ok, %Ch.Result{rows: [[0], [1], [2]]}} =
Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3")

Expand All @@ -70,70 +73,76 @@ Note on datetime encoding in query parameters:
#### Insert rows

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)")

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({$0:UInt8}), ({$1:UInt32})", [0, 1])

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1})

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2})
```

#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (efficient)
#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) (efficient)

```elixir
{:ok, pid} = Ch.start_link()
Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, name String) ENGINE Null")

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
rows = [
[0, "zero"],
[1, "one"],
[2, "two"]
]

types = ["UInt64"]
# or
types = [Ch.Types.u64()]
# or
types = [:u64]
types = ["UInt64", "String"]
# or types = [:u64, :string]
# or types = [Ch.Types.u64(), Ch.Types.string()]

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types)
```
sql = [
# note the \n -- ClickHouse uses it to separate SQL from the data
"INSERT INTO ch_demo(id, name) FORMAT RowBinary\n" | Ch.RowBinary.encode_rows(rows, types)
]

Note that RowBinary format encoding requires `:types` option to be provided.
%Ch.Result{num_rows: 2} = Ch.query!(pid, sql)
```

Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check.
Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats/RowBinaryWithNamesAndTypes) which would additionally do something like a type check.

```elixir
sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes"
opts = [names: ["id"], types: ["UInt64"]]
rows = [[0], [1]]
names = ["id", "name"]
types = ["UInt64", "String"]

rows = [
[0, "zero"],
[1, "one"],
[2, "two"]
]

sql = [
"INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n",
Ch.RowBinary.encode_names_and_types(names, types),
| Ch.RowBinary.encode_rows(rows, types)
]

%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts)
%Ch.Result{num_rows: 2} = Ch.query!(pid, sql)
```

#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats)

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n)

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false)
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
```

#### Insert rows as chunked RowBinary stream

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end)
Expand All @@ -150,8 +159,6 @@ This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/
#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings)

```elixir
{:ok, pid} = Ch.start_link()

settings = [async_insert: 1]

%Ch.Result{rows: [["async_insert", "Bool", "0"]]} =
Expand Down Expand Up @@ -208,6 +215,8 @@ Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)",
Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b")
```

Or [`RowBinaryWithDefaults`](https://clickhouse.com/docs/en/interfaces/formats/RowBinaryWithDefaults). TODO.

#### UTF-8 in RowBinary

When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types/string) columns non UTF-8 characters are replaced with `` (U+FFFD). This behaviour is similar to [`toValidUTF8`](https://clickhouse.com/docs/en/sql-reference/functions/string-functions#tovalidutf8) and [JSON format.](https://clickhouse.com/docs/en/interfaces/formats#json)
Expand Down
Empty file added guides/buffer.md
Empty file.
Empty file added guides/compression.md
Empty file.
Empty file added guides/format.md
Empty file.
15 changes: 7 additions & 8 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Ch do
| {:scheme, String.t()}
| {:hostname, String.t()}
| {:port, :inet.port_number()}
| {:transport_opts, :gen_tcp.connect_option()}
| {:transport_opts, [:gen_tcp.connect_option() | :ssl.client_option()]}
| DBConnection.start_option()

@doc """
Expand Down Expand Up @@ -56,11 +56,12 @@ defmodule Ch do
| {:headers, [{String.t(), String.t()}]}
| {:format, String.t()}
| {:types, [String.t() | atom | tuple]}
# TODO remove
| {:encode, boolean}
| {:decode, boolean}
| DBConnection.connection_option()

@type query_param_key :: String.t() | atom
@type query_params :: %{query_param_key => term} | [term]

@doc """
Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
`{:error, Exception.t()}` if there was a database error.
Expand All @@ -79,9 +80,8 @@ defmodule Ch do
* [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)
"""
@spec query(DBConnection.conn(), iodata, params, [query_option]) ::
@spec query(DBConnection.conn(), iodata, Ch.query_params(), [query_option]) ::
{:ok, Result.t()} | {:error, Exception.t()}
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
def query(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)

Expand All @@ -94,15 +94,14 @@ defmodule Ch do
Runs a query and returns the result or raises `Ch.Error` if
there was an error. See `query/4`.
"""
@spec query!(DBConnection.conn(), iodata, params, [query_option]) :: Result.t()
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
@spec query!(DBConnection.conn(), iodata, Ch.query_params(), [query_option]) :: Result.t()
def query!(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.execute!(conn, query, params, opts)
end

@doc false
@spec stream(DBConnection.t(), iodata, map | [term], [query_option]) :: Ch.Stream.t()
@spec stream(DBConnection.t(), iodata, Ch.query_params(), [query_option]) :: Ch.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
%Ch.Stream{conn: conn, query: query, params: params, opts: opts}
Expand Down
46 changes: 0 additions & 46 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,25 +212,6 @@ defmodule Ch.Connection do
end
end

def handle_execute(%Query{command: :insert} = query, params, opts, conn) do
conn = maybe_reconnect(conn)
{query_params, extra_headers, body} = params

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

result =
if is_function(body, 2) do
request_chunked(conn, "POST", path, headers, body, opts)
else
request(conn, "POST", path, headers, body, opts)
end

with {:ok, conn, responses} <- result do
{:ok, query, responses, conn}
end
end

def handle_execute(query, params, opts, conn) do
conn = maybe_reconnect(conn)
{query_params, extra_headers, body} = params
Expand Down Expand Up @@ -261,33 +242,6 @@ defmodule Ch.Connection do
end
end

@spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
def request_chunked(conn, method, path, headers, stream, opts) do
with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream),
{:ok, conn} <- stream_body(conn, ref, stream),
do: receive_full_response(conn, timeout(conn, opts))
end

@spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) ::
{:ok, conn} | {:disconnect, Mint.Types.error(), conn}
defp stream_body(conn, ref, stream) do
result =
stream
|> Stream.concat([:eof])
|> Enum.reduce_while({:ok, conn}, fn
chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)}
_chunk, {:error, _conn, _reason} = error -> {:halt, error}
end)

case result do
{:ok, _conn} = ok -> ok
{:error, conn, reason} -> {:disconnect, reason, conn}
end
end

# stacktrace is a bit cleaner with this function inlined
@compile inline: [send_request: 5]
defp send_request(conn, method, path, headers, body) do
Expand Down
Loading

0 comments on commit 1275855

Please sign in to comment.