From 790de76131e013d1f2bce38846a9369d53ebff9c Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Thu, 6 Feb 2025 16:24:28 +0300 Subject: [PATCH] raw api --- CHANGELOG.md | 4 + README.md | 101 ++++++----- guides/buffer.md | 0 guides/compression.md | 0 guides/format.md | 0 lib/ch.ex | 15 +- lib/ch/connection.ex | 46 ----- lib/ch/query.ex | 67 ++------ lib/ch/row_binary.ex | 6 +- mix.exs | 3 +- mix.lock | 10 ++ test/ch/aggregation_test.exs | 94 ++++++----- test/ch/compression_test.exs | 44 +++++ test/ch/connection_test.exs | 314 +++++++++++++++++++---------------- test/ch/faults_test.exs | 82 +++++---- test/ch/stream_test.exs | 9 +- 16 files changed, 414 insertions(+), 381 deletions(-) create mode 100644 guides/buffer.md create mode 100644 guides/compression.md create mode 100644 guides/format.md create mode 100644 test/ch/compression_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e3f3ab..850bcf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +- move rows for INSERT from `params` to `statement` for more control (compression, format, query params, etc.) + ## 0.3.0 (2025-02-03) - gracefully handle `connection: closed` response from server https://github.com/plausible/ch/pull/211 diff --git a/README.md b/README.md index c58af25..3bd5c21 100644 --- a/README.md +++ b/README.md @@ -7,27 +7,25 @@ Minimal HTTP [ClickHouse](https://clickhouse.com) client for Elixir. Used in [Ecto ClickHouse adapter.](https://github.com/plausible/ecto_ch) -### Key features - -- RowBinary -- Native query parameters -- Per query settings -- Minimal API - -Your ideas are welcome [here.](https://github.com/plausible/ch/issues/82) - ## Installation ```elixir defp deps do [ - {:ch, "~> 0.3.0"} + {:ch, "~> 0.4.0"} ] end ``` ## Usage +#### Start ClickHouse + +```sh +# don't forget to stop the container once done +docker run --rm -p 8123:8123 -e CLICKHOUSE_PASSWORD=secret --ulimit nofile=262144:262144 clickhouse/clickhouse-server:latest-alpine +``` + #### Start [DBConnection](https://github.com/elixir-ecto/db_connection) pool ```elixir @@ -35,23 +33,28 @@ defaults = [ scheme: "http", hostname: "localhost", port: 8123, - database: "default", + database: "default", settings: [], pool_size: 1, timeout: :timer.seconds(15) ] -# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have -# network access by default in the official Docker images -# see https://github.com/ClickHouse/ClickHouse/pull/75259 -{:ok, pid} = Ch.start_link(defaults) +custom = [ + # note that starting in ClickHouse 25.1.3.23 `default` user doesn't have + # network access by default in the official Docker images + # see https://github.com/ClickHouse/ClickHouse/pull/75259 + username: "default", + # this password was provided via `CLICKHOUSE_PASSWORD` to the container above + password: "secret", +] + +config = Keyword.merge(defaults, custom) +{:ok, pid} = Ch.start_link(config) ``` #### Select rows ```elixir -{:ok, pid} = Ch.start_link() - {:ok, %Ch.Result{rows: [[0], [1], [2]]}} = Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3") @@ -70,16 +73,11 @@ Note on datetime encoding in query parameters: #### Insert rows ```elixir -{:ok, pid} = Ch.start_link() - Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") %Ch.Result{num_rows: 2} = Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)") -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({$0:UInt8}), ({$1:UInt32})", [0, 1]) - %Ch.Result{num_rows: 2} = Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1}) @@ -87,53 +85,64 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2}) ``` -#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (efficient) +#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) (efficient) ```elixir -{:ok, pid} = Ch.start_link() +Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, name String) ENGINE Null") -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") +rows = [ + [0, "zero"], + [1, "one"], + [2, "two"] +] -types = ["UInt64"] -# or -types = [Ch.Types.u64()] -# or -types = [:u64] +types = ["UInt64", "String"] +# or types = [:u64, :string] +# or types = [Ch.Types.u64(), Ch.Types.string()] -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types) -``` +sql = [ + # note the \n -- ClickHouse uses it to separate SQL from the data + "INSERT INTO ch_demo(id, name) FORMAT RowBinary\n" | Ch.RowBinary.encode_rows(rows, types) +] -Note that RowBinary format encoding requires `:types` option to be provided. +%Ch.Result{num_rows: 2} = Ch.query!(pid, sql) +``` -Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check. +Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats/RowBinaryWithNamesAndTypes) which would additionally do something like a type check. ```elixir -sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes" -opts = [names: ["id"], types: ["UInt64"]] -rows = [[0], [1]] +names = ["id", "name"] +types = ["UInt64", "String"] + +rows = [ + [0, "zero"], + [1, "one"], + [2, "two"] +] + +sql = [ + "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n", + Ch.RowBinary.encode_names_and_types(names, types), + | Ch.RowBinary.encode_rows(rows, types) +] -%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts) +%Ch.Result{num_rows: 2} = Ch.query!(pid, sql) ``` #### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats) ```elixir -{:ok, pid} = Ch.start_link() - Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n) %Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false) + Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv]) ``` #### Insert rows as chunked RowBinary stream ```elixir -{:ok, pid} = Ch.start_link() - Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end) @@ -150,8 +159,6 @@ This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/ #### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings) ```elixir -{:ok, pid} = Ch.start_link() - settings = [async_insert: 1] %Ch.Result{rows: [["async_insert", "Bool", "0"]]} = @@ -208,6 +215,8 @@ Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)", Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b") ``` +Or [`RowBinaryWithDefaults`](https://clickhouse.com/docs/en/interfaces/formats/RowBinaryWithDefaults). TODO. + #### UTF-8 in RowBinary When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types/string) columns non UTF-8 characters are replaced with `�` (U+FFFD). This behaviour is similar to [`toValidUTF8`](https://clickhouse.com/docs/en/sql-reference/functions/string-functions#tovalidutf8) and [JSON format.](https://clickhouse.com/docs/en/interfaces/formats#json) diff --git a/guides/buffer.md b/guides/buffer.md new file mode 100644 index 0000000..e69de29 diff --git a/guides/compression.md b/guides/compression.md new file mode 100644 index 0000000..e69de29 diff --git a/guides/format.md b/guides/format.md new file mode 100644 index 0000000..e69de29 diff --git a/lib/ch.ex b/lib/ch.ex index 6db8b0d..4599206 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -14,7 +14,7 @@ defmodule Ch do | {:scheme, String.t()} | {:hostname, String.t()} | {:port, :inet.port_number()} - | {:transport_opts, :gen_tcp.connect_option()} + | {:transport_opts, [:gen_tcp.connect_option() | :ssl.client_option()]} | DBConnection.start_option() @doc """ @@ -56,11 +56,12 @@ defmodule Ch do | {:headers, [{String.t(), String.t()}]} | {:format, String.t()} | {:types, [String.t() | atom | tuple]} - # TODO remove - | {:encode, boolean} | {:decode, boolean} | DBConnection.connection_option() + @type query_param_key :: String.t() | atom + @type query_params :: %{query_param_key => term} | [term] + @doc """ Runs a query and returns the result as `{:ok, %Ch.Result{}}` or `{:error, Exception.t()}` if there was a database error. @@ -79,9 +80,8 @@ defmodule Ch do * [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0) """ - @spec query(DBConnection.conn(), iodata, params, [query_option]) :: + @spec query(DBConnection.conn(), iodata, Ch.query_params(), [query_option]) :: {:ok, Result.t()} | {:error, Exception.t()} - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() def query(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) @@ -94,15 +94,14 @@ defmodule Ch do Runs a query and returns the result or raises `Ch.Error` if there was an error. See `query/4`. """ - @spec query!(DBConnection.conn(), iodata, params, [query_option]) :: Result.t() - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() + @spec query!(DBConnection.conn(), iodata, Ch.query_params(), [query_option]) :: Result.t() def query!(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) DBConnection.execute!(conn, query, params, opts) end @doc false - @spec stream(DBConnection.t(), iodata, map | [term], [query_option]) :: Ch.Stream.t() + @spec stream(DBConnection.t(), iodata, Ch.query_params(), [query_option]) :: Ch.Stream.t() def stream(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) %Ch.Stream{conn: conn, query: query, params: params, opts: opts} diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 09ba17e..448cbac 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -212,25 +212,6 @@ defmodule Ch.Connection do end end - def handle_execute(%Query{command: :insert} = query, params, opts, conn) do - conn = maybe_reconnect(conn) - {query_params, extra_headers, body} = params - - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) - - result = - if is_function(body, 2) do - request_chunked(conn, "POST", path, headers, body, opts) - else - request(conn, "POST", path, headers, body, opts) - end - - with {:ok, conn, responses} <- result do - {:ok, query, responses, conn} - end - end - def handle_execute(query, params, opts, conn) do conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params @@ -261,33 +242,6 @@ defmodule Ch.Connection do end end - @spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) :: - {:ok, conn, [response]} - | {:error, Error.t(), conn} - | {:disconnect, Mint.Types.error(), conn} - def request_chunked(conn, method, path, headers, stream, opts) do - with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream), - {:ok, conn} <- stream_body(conn, ref, stream), - do: receive_full_response(conn, timeout(conn, opts)) - end - - @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) :: - {:ok, conn} | {:disconnect, Mint.Types.error(), conn} - defp stream_body(conn, ref, stream) do - result = - stream - |> Stream.concat([:eof]) - |> Enum.reduce_while({:ok, conn}, fn - chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)} - _chunk, {:error, _conn, _reason} = error -> {:halt, error} - end) - - case result do - {:ok, _conn} = ok -> ok - {:error, conn, reason} -> {:disconnect, reason, conn} - end - end - # stacktrace is a bit cleaner with this function inlined @compile inline: [send_request: 5] defp send_request(conn, method, path, headers, body) do diff --git a/lib/ch/query.ex b/lib/ch/query.ex index bb71fd6..c8b640b 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -1,16 +1,15 @@ defmodule Ch.Query do @moduledoc "Query struct wrapping the SQL statement." - defstruct [:statement, :command, :encode, :decode] + defstruct [:statement, :command, :decode] - @type t :: %__MODULE__{statement: iodata, command: command, encode: boolean, decode: boolean} + @type t :: %__MODULE__{statement: iodata, command: command, decode: boolean} @doc false @spec build(iodata, [Ch.query_option()]) :: t def build(statement, opts \\ []) do command = Keyword.get(opts, :command) || extract_command(statement) - encode = Keyword.get(opts, :encode, true) decode = Keyword.get(opts, :decode, true) - %__MODULE__{statement: statement, command: command, encode: encode, decode: decode} + %__MODULE__{statement: statement, command: command, decode: decode} end statements = [ @@ -64,6 +63,7 @@ defmodule Ch.Query do end defp extract_command([first_segment | _] = statement) do + # TODO extract_command(first_segment) || extract_command(IO.iodata_to_binary(statement)) end @@ -79,9 +79,11 @@ defimpl DBConnection.Query, for: Ch.Query do @spec describe(Query.t(), [Ch.query_option()]) :: Query.t() def describe(query, _opts), do: query + @type query_params :: [{String.t(), String.t()}] + # stream: insert init @spec encode(Query.t(), {:stream, term}, [Ch.query_option()]) :: - {:stream, {[{String.t(), String.t()}], Mint.Types.headers(), iodata}} + {:stream, {query_params, Mint.Types.headers(), iodata}} def encode(query, {:stream, params}, opts) do {:stream, encode(query, params, opts)} end @@ -93,57 +95,18 @@ defimpl DBConnection.Query, for: Ch.Query do {:stream, ref, data} end - @spec encode(Query.t(), params, [Ch.query_option()]) :: - {query_params, Mint.Types.headers(), body} - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t(), - query_params: [{String.t(), String.t()}], - body: iodata | Enumerable.t() - - def encode(%Query{command: :insert, encode: false, statement: statement}, data, opts) do - body = - case data do - _ when is_list(data) or is_binary(data) -> [statement, ?\n | data] - _ -> Stream.concat([[statement, ?\n]], data) - end - - {_query_params = [], headers(opts), body} - end - + @spec encode(Query.t(), Ch.query_params(), [Ch.query_option()]) :: + {query_params, Mint.Types.headers(), iodata} def encode(%Query{command: :insert, statement: statement}, params, opts) do - cond do - names = Keyword.get(opts, :names) -> - types = Keyword.fetch!(opts, :types) - header = RowBinary.encode_names_and_types(names, types) - data = RowBinary.encode_rows(params, types) - {_query_params = [], headers(opts), [statement, ?\n, header | data]} - - format_row_binary?(statement) -> - types = Keyword.fetch!(opts, :types) - data = RowBinary.encode_rows(params, types) - {_query_params = [], headers(opts), [statement, ?\n | data]} - - true -> - {query_params(params), headers(opts), statement} - end + {query_params(params), headers(opts), statement} end def encode(%Query{statement: statement}, params, opts) do - types = Keyword.get(opts, :types) - default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes" + default_format = "RowBinaryWithNamesAndTypes" format = Keyword.get(opts, :format) || default_format {query_params(params), [{"x-clickhouse-format", format} | headers(opts)], statement} end - defp format_row_binary?(statement) when is_binary(statement) do - statement |> String.trim_trailing() |> String.ends_with?("RowBinary") - end - - defp format_row_binary?(statement) when is_list(statement) do - statement - |> IO.iodata_to_binary() - |> format_row_binary?() - end - # stream: select result @spec decode(Query.t(), result, [Ch.query_option()]) :: result when result: Result.t() def decode(_query, %Result{} = result, _opts), do: result @@ -174,16 +137,11 @@ defimpl DBConnection.Query, for: Ch.Query do %Result{rows: data, data: data, command: command, headers: headers} end - def decode(%Query{command: command}, responses, opts) when is_list(responses) do + def decode(%Query{command: command}, responses, _opts) when is_list(responses) do # TODO potentially fails on x-progress-headers [_status, headers | data] = responses case get_header(headers, "x-clickhouse-format") do - "RowBinary" -> - types = Keyword.fetch!(opts, :types) - rows = data |> IO.iodata_to_binary() |> RowBinary.decode_rows(types) - %Result{num_rows: length(rows), rows: rows, command: command, headers: headers} - "RowBinaryWithNamesAndTypes" -> rows = data |> IO.iodata_to_binary() |> RowBinary.decode_rows() %Result{num_rows: length(rows), rows: rows, command: command, headers: headers} @@ -204,6 +162,7 @@ defimpl DBConnection.Query, for: Ch.Query do Enum.map(params, fn {k, v} -> {"param_#{k}", encode_param(v)} end) end + # TODO defp query_params(params) when is_list(params) do params |> Enum.with_index() diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex index 584f603..485cc9f 100644 --- a/lib/ch/row_binary.ex +++ b/lib/ch/row_binary.ex @@ -1,5 +1,5 @@ defmodule Ch.RowBinary do - @moduledoc "Helpers for working with ClickHouse [`RowBinary`](https://clickhouse.com/docs/en/sql-reference/formats#rowbinary) format." + @moduledoc "Helpers for working with ClickHouse [`RowBinary`](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) format." # @compile {:bin_opt_info, true} @dialyzer :no_improper_lists @@ -28,7 +28,7 @@ defmodule Ch.RowBinary do defp encode_types([] = done), do: done @doc """ - Encodes a single row to [`RowBinary`](https://clickhouse.com/docs/en/sql-reference/formats#rowbinary) as iodata. + Encodes a single row to [`RowBinary`](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) as iodata. Examples: @@ -50,7 +50,7 @@ defmodule Ch.RowBinary do defp _encode_row([] = done, []), do: done @doc """ - Encodes multiple rows to [`RowBinary`](https://clickhouse.com/docs/en/sql-reference/formats#rowbinary) as iodata. + Encodes multiple rows to [`RowBinary`](https://clickhouse.com/docs/en/interfaces/formats/RowBinaryWithNamesAndTypes) as iodata. Examples: diff --git a/mix.exs b/mix.exs index 6bcea1d..a4170bf 100644 --- a/mix.exs +++ b/mix.exs @@ -41,7 +41,8 @@ defmodule Ch.MixProject do {:benchee, "~> 1.0", only: [:bench]}, {:dialyxir, "~> 1.0", only: [:dev], runtime: false}, {:ex_doc, ">= 0.0.0", only: :docs}, - {:tz, "~> 0.28.1", only: [:test]} + {:tz, "~> 0.28.1", only: [:test]}, + {:nimble_lz4, "~> 1.1", only: [:test]} ] end diff --git a/mix.lock b/mix.lock index 7eb5428..fa698cd 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,6 @@ %{ "benchee": {:hex, :benchee, "1.3.1", "c786e6a76321121a44229dde3988fc772bca73ea75170a73fd5f4ddf1af95ccf", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "76224c58ea1d0391c8309a8ecbfe27d71062878f59bd41a390266bf4ac1cc56d"}, + "castore": {:hex, :castore, "1.0.11", "4bbd584741601eb658007339ea730b082cc61f3554cf2e8f39bf693a11b49073", [:mix], [], "hexpm", "e03990b4db988df56262852f20de0f659871c35154691427a5047f4967a16a62"}, "db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"}, "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, @@ -8,14 +9,23 @@ "ecto": {:hex, :ecto, "3.12.5", "4a312960ce612e17337e7cefcf9be45b95a3be6b36b6f94dfb3d8c361d631866", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6eb18e80bef8bb57e17f5a7f068a1719fbda384d40fc37acb8eb8aeca493b6ea"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.36.1", "4197d034f93e0b89ec79fac56e226107824adcce8d2dd0a26f5ed3a95efc36b1", [:mix], [{:earmark_parser, "~> 1.4.42", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d7d26a7cf965dacadcd48f9fa7b5953d7d0cfa3b44fa7a65514427da44eafd89"}, + "finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"}, "hpax": {:hex, :hpax, "1.0.2", "762df951b0c399ff67cc57c3995ec3cf46d696e41f0bba17da0518d94acd4aac", [:mix], [], "hexpm", "2f09b4c1074e0abd846747329eaa26d535be0eb3d189fa69d812bfb8bfefd32f"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"}, + "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, + "nimble_lz4": {:hex, :nimble_lz4, "1.1.0", "53b87e37f1efc79fda6433ab35563788a628c7d33aef45d16f31a86a399a3cc5", [:mix], [{:rustler, "~> 0.34.0", [hex: :rustler, repo: "hexpm", optional: false]}, {:rustler_precompiled, "~> 0.7.2", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}], "hexpm", "2c1d46eee76c5bbba8d6d3d23c75210dcb509f6698f0a01fb95015bf95f1b6d3"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, + "req": {:hex, :req, "0.5.8", "50d8d65279d6e343a5e46980ac2a70e97136182950833a1968b371e753f6a662", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "d7fc5898a566477e174f26887821a3c5082b243885520ee4b45555f5d53f40ef"}, + "rustler": {:hex, :rustler, "0.34.0", "e9a73ee419fc296a10e49b415a2eb87a88c9217aa0275ec9f383d37eed290c1c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "1d0c7449482b459513003230c0e2422b0252245776fe6fd6e41cb2b11bd8e628"}, + "rustler_precompiled": {:hex, :rustler_precompiled, "0.7.3", "42cb9449785cd86c87453e39afdd27a0bdfa5c77a4ec5dc5ce45112e06b9f89b", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "cbc4b3777682e5f6f43ed39b0e0b4a42dccde8053aba91b4514e8f5ff9a5ac6d"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "tz": {:hex, :tz, "0.28.1", "717f5ffddfd1e475e2a233e221dc0b4b76c35c4b3650b060c8e3ba29dd6632e9", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:mint, "~> 1.6", [hex: :mint, repo: "hexpm", optional: true]}], "hexpm", "bfdca1aa1902643c6c43b77c1fb0cb3d744fd2f09a8a98405468afdee0848c8a"}, } diff --git a/test/ch/aggregation_test.exs b/test/ch/aggregation_test.exs index 37bd8a3..6706d66 100644 --- a/test/ch/aggregation_test.exs +++ b/test/ch/aggregation_test.exs @@ -87,22 +87,27 @@ defmodule Ch.AggregationTest do ) ENGINE AggregatingMergeTree ORDER BY uid """) - rows = [ - [1, ~N[2020-01-02 00:00:00], "b"], - [1, ~N[2020-01-01 00:00:00], "a"] - ] + rowbinary = + Ch.RowBinary.encode_rows( + _rows = [ + [1, ~N[2020-01-02 00:00:00], "b"], + [1, ~N[2020-01-01 00:00:00], "a"] + ], + _types = ["Int16", "DateTime", "String"] + ) assert %{num_rows: 2} = Ch.query!( conn, - """ - INSERT INTO test_insert_aggregate_function - SELECT uid, updated, arrayReduce('argMaxState', [name], [updated]) - FROM input('uid Int16, updated DateTime, name String') - FORMAT RowBinary\ - """, - rows, - types: ["Int16", "DateTime", "String"] + [ + """ + INSERT INTO test_insert_aggregate_function + SELECT uid, updated, arrayReduce('argMaxState', [name], [updated]) + FROM input('uid Int16, updated DateTime, name String') + FORMAT RowBinary + """ + | rowbinary + ] ) assert Ch.query!(conn, """ @@ -124,14 +129,21 @@ defmodule Ch.AggregationTest do ) ENGINE AggregatingMergeTree ORDER BY uid """) + rowbinary = + Ch.RowBinary.encode_rows( + _rows = [ + [1231, ~N[2020-01-02 00:00:00], "Jane"], + [1231, ~N[2020-01-01 00:00:00], "John"] + ], + _types = ["Int16", "DateTime", "String"] + ) + Ch.query!( conn, - "INSERT INTO test_users_ephemeral_column(uid, updated, name_stub) FORMAT RowBinary", - _rows = [ - [1231, ~N[2020-01-02 00:00:00], "Jane"], - [1231, ~N[2020-01-01 00:00:00], "John"] - ], - types: ["Int16", "DateTime", "String"] + [ + "INSERT INTO test_users_ephemeral_column(uid, updated, name_stub) FORMAT RowBinary\n" + | rowbinary + ] ) assert Ch.query!(conn, """ @@ -150,18 +162,25 @@ defmodule Ch.AggregationTest do ) ENGINE AggregatingMergeTree ORDER BY uid """) + rowbinary = + Ch.RowBinary.encode_rows( + _rows = [ + [1231, ~N[2020-01-02 00:00:00], "Jane"], + [1231, ~N[2020-01-01 00:00:00], "John"] + ], + _types = ["Int16", "DateTime", "String"] + ) + Ch.query!( conn, - """ - INSERT INTO test_users_input_function - SELECT uid, updated, arrayReduce('argMaxState', [name], [updated]) - FROM input('uid Int16, updated DateTime, name String') FORMAT RowBinary\ - """, - _rows = [ - [1231, ~N[2020-01-02 00:00:00], "Jane"], - [1231, ~N[2020-01-01 00:00:00], "John"] - ], - types: ["Int16", "DateTime", "String"] + [ + """ + INSERT INTO test_users_input_function + SELECT uid, updated, arrayReduce('argMaxState', [name], [updated]) + FROM input('uid Int16, updated DateTime, name String') FORMAT RowBinary + """ + | rowbinary + ] ) assert Ch.query!(conn, """ @@ -194,15 +213,16 @@ defmodule Ch.AggregationTest do FROM test_users_ne """) - Ch.query!( - conn, - "INSERT INTO test_users_ne FORMAT RowBinary", - _rows = [ - [1231, ~N[2020-01-02 00:00:00], "Jane"], - [1231, ~N[2020-01-01 00:00:00], "John"] - ], - types: ["Int16", "DateTime", "String"] - ) + rowbinary = + Ch.RowBinary.encode_rows( + _rows = [ + [1231, ~N[2020-01-02 00:00:00], "Jane"], + [1231, ~N[2020-01-01 00:00:00], "John"] + ], + _types = ["Int16", "DateTime", "String"] + ) + + Ch.query!(conn, ["INSERT INTO test_users_ne FORMAT RowBinary\n" | rowbinary]) assert Ch.query!(conn, """ SELECT uid, max(updated) AS updated, argMaxMerge(name) diff --git a/test/ch/compression_test.exs b/test/ch/compression_test.exs new file mode 100644 index 0000000..569851f --- /dev/null +++ b/test/ch/compression_test.exs @@ -0,0 +1,44 @@ +defmodule Ch.CompressionTest do + use ExUnit.Case + + setup do + {:ok, conn: start_supervised!({Ch, database: Ch.Test.database()})} + end + + test "gzip", %{conn: conn} do + Ch.query!(conn, "create table gzip_insert_test(i Int64, s String) engine Memory") + + rowbinary = + Ch.RowBinary.encode_rows( + [[1, "alice"], [2, "bob"], [3, "alice"]], + _types = ["Int64", "String"] + ) + + compressed = :zlib.gzip(["insert into gzip_insert_test(i, s) format RowBinary\n" | rowbinary]) + + assert %Ch.Result{num_rows: 3} = + Ch.query!( + conn, + compressed, + _no_params = [], + command: :insert, + headers: [{"content-encoding", "gzip"}] + ) + + assert Ch.query!(conn, "select i, s from gzip_insert_test order by i").rows == [ + [1, "alice"], + [2, "bob"], + [3, "alice"] + ] + end + + # NOTE: ClickHouse uses custom LZ4 frame format + # TODO: example https://github.com/ClickHouse/clickhouse-rs/blob/main/src/compression/lz4.rs + # we need cityhash to generate checksum + @tag :skip + test "lz4" + + # TODO: https://github.com/erlang/otp/pull/9316 + @tag :skip + test "zstd" +end diff --git a/test/ch/connection_test.exs b/test/ch/connection_test.exs index ff00898..6a322ca 100644 --- a/test/ch/connection_test.exs +++ b/test/ch/connection_test.exs @@ -10,10 +10,6 @@ defmodule Ch.ConnectionTest do assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select 1") end - test "select with types", %{conn: conn} do - assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select 1", [], types: ["UInt8"]) - end - test "select with params", %{conn: conn} do assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select {a:UInt8}", %{"a" => 1}) @@ -155,11 +151,15 @@ defmodule Ch.ConnectionTest do tokyo = DateTime.shift_zone!(utc, "Asia/Tokyo") vienna = DateTime.shift_zone!(utc, "Europe/Vienna") - rows = [["taipei", taipei], ["tokyo", tokyo], ["vienna", vienna]] + rowbinary = + Ch.RowBinary.encode_rows( + _rows = [["taipei", taipei], ["tokyo", tokyo], ["vienna", vienna]], + _types = ["String", "DateTime"] + ) - Ch.query!(conn, "insert into ch_non_utc_datetimes(name, datetime) format RowBinary", rows, - types: ["String", "DateTime"] - ) + Ch.query!(conn, [ + "insert into ch_non_utc_datetimes(name, datetime) format RowBinary\n" | rowbinary + ]) result = conn @@ -281,26 +281,14 @@ defmodule Ch.ConnectionTest do assert message =~ "Cannot execute query in readonly mode." end - test "automatic RowBinary", %{conn: conn, table: table} do - stmt = "insert into #{table}(a, b) format RowBinary" - types = ["UInt8", "String"] - rows = [[1, "a"], [2, "b"]] - assert %{num_rows: 2} = Ch.query!(conn, stmt, rows, types: types) - - assert %{rows: rows} = - Ch.query!(conn, "select * from {table:Identifier}", %{"table" => table}) - - assert rows == [[1, "a"], [2, "b"]] - end - test "manual RowBinary", %{conn: conn, table: table} do - stmt = "insert into #{table}(a, b) format RowBinary" + stmt = "insert into #{table}(a, b) format RowBinary\n" types = ["UInt8", "String"] rows = [[1, "a"], [2, "b"]] data = RowBinary.encode_rows(rows, types) - assert %{num_rows: 2} = Ch.query!(conn, stmt, data, encode: false) + assert %{num_rows: 2} = Ch.query!(conn, [stmt | data]) assert %{rows: rows} = Ch.query!(conn, "select * from {table:Identifier}", %{"table" => table}) @@ -317,13 +305,15 @@ defmodule Ch.ConnectionTest do |> Stream.chunk_every(2) |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) - assert {:ok, %{num_rows: 3}} = - Ch.query( - conn, - "insert into #{table}(a, b) format RowBinary", - stream, - encode: false - ) + Ch.run(conn, fn conn -> + stream + |> Stream.into( + Ch.stream(conn, "insert into {table:Identifier}(a, b) format RowBinary\n", %{ + "table" => table + }) + ) + |> Stream.run() + end) assert {:ok, %{rows: rows}} = Ch.query(conn, "select * from {table:Identifier}", %{"table" => table}) @@ -431,18 +421,19 @@ defmodule Ch.ConnectionTest do Ch.query!(conn, "create table fixed_string_t(a FixedString(3)) engine = Memory") + rowbinary = + RowBinary.encode_rows( + _rows = [ + [""], + ["a"], + ["aa"], + ["aaa"] + ], + _types = ["FixedString(3)"] + ) + assert {:ok, %{num_rows: 4}} = - Ch.query( - conn, - "insert into fixed_string_t(a) format RowBinary", - [ - [""], - ["a"], - ["aa"], - ["aaa"] - ], - types: ["FixedString(3)"] - ) + Ch.query(conn, ["insert into fixed_string_t(a) format RowBinary\n" | rowbinary]) assert Ch.query!(conn, "select * from fixed_string_t").rows == [ [<<0, 0, 0>>], @@ -475,17 +466,18 @@ defmodule Ch.ConnectionTest do Ch.query!(conn, "create table decimal_t(d Decimal32(4)) engine = Memory") + rowbinary = + RowBinary.encode_rows( + _rows = [ + [Decimal.new("2.66")], + [Decimal.new("2.6666")], + [Decimal.new("2.66666")] + ], + _types = ["Decimal32(4)"] + ) + assert %{num_rows: 3} = - Ch.query!( - conn, - "insert into decimal_t(d) format RowBinary", - _rows = [ - [Decimal.new("2.66")], - [Decimal.new("2.6666")], - [Decimal.new("2.66666")] - ], - types: ["Decimal32(4)"] - ) + Ch.query!(conn, ["insert into decimal_t(d) format RowBinary\n" | rowbinary]) assert Ch.query!(conn, "select * from decimal_t").rows == [ [Decimal.new("2.6600")], @@ -507,12 +499,10 @@ defmodule Ch.ConnectionTest do Ch.query!(conn, "INSERT INTO test_bool VALUES (1, true),(2,0)") - Ch.query!( - conn, - "insert into test_bool(A, B) format RowBinary", - _rows = [[3, true], [4, false]], - types: ["Int64", "Bool"] - ) + rowbinary = + RowBinary.encode_rows(_rows = [[3, true], [4, false]], _types = ["Int64", "Bool"]) + + Ch.query!(conn, ["insert into test_bool(A, B) format RowBinary\n" | rowbinary]) # anything > 0 is `true`, here `2` is `true` Ch.query!(conn, "insert into test_bool(A, B) values (5, 2)") @@ -550,12 +540,10 @@ defmodule Ch.ConnectionTest do Ch.query!(conn, "INSERT INTO t_uuid (y) VALUES ('Example 2')") - Ch.query!( - conn, - "insert into t_uuid(x,y) format RowBinary", - _rows = [[uuid, "Example 3"]], - types: ["UUID", "String"] - ) + rowbinary = + RowBinary.encode_rows(_rows = [[uuid, "Example 3"]], _types = ["UUID", "String"]) + + Ch.query!(conn, ["insert into t_uuid(x,y) format RowBinary\n" | rowbinary]) assert {:ok, %{ @@ -613,12 +601,13 @@ defmodule Ch.ConnectionTest do [2, "hello", 1] ] - Ch.query!( - conn, - "INSERT INTO t_enum(i, x) FORMAT RowBinary", - _rows = [[3, "hello"], [4, "world"], [5, 1], [6, 2]], - types: ["UInt8", "Enum8('hello' = 1, 'world' = 2)"] - ) + rowbinary = + RowBinary.encode_rows( + _rows = [[3, "hello"], [4, "world"], [5, 1], [6, 2]], + _types = ["UInt8", "Enum8('hello' = 1, 'world' = 2)"] + ) + + Ch.query!(conn, ["INSERT INTO t_enum(i, x) FORMAT RowBinary\n" | rowbinary]) assert Ch.query!(conn, "SELECT *, CAST(x, 'Int8') FROM t_enum ORDER BY i").rows == [ [0, "hello", 1], @@ -662,21 +651,22 @@ defmodule Ch.ConnectionTest do [0] ] - assert Ch.query!( - conn, - "INSERT INTO table_map FORMAT RowBinary", - _rows = [ - [%{"key10" => 20, "key20" => 40}], - # empty map - [%{}], - # null map - [nil], - # empty proplist map - [[]], - [[{"key50", 100}]] - ], - types: ["Map(String, UInt64)"] - ) + rowbinary = + RowBinary.encode_rows( + _rows = [ + [%{"key10" => 20, "key20" => 40}], + # empty map + [%{}], + # null map + [nil], + # empty proplist map + [[]], + [[{"key50", 100}]] + ], + _types = ["Map(String, UInt64)"] + ) + + assert Ch.query!(conn, ["INSERT INTO table_map FORMAT RowBinary\n" | rowbinary]) assert Ch.query!(conn, "SELECT * FROM table_map ORDER BY a ASC").rows == [ [%{}], @@ -713,13 +703,14 @@ defmodule Ch.ConnectionTest do assert %{num_rows: 2} = Ch.query!(conn, "INSERT INTO tuples_t VALUES (('y', 10)), (('x',-10))") + rowbinary = + RowBinary.encode_rows( + _rows = [[{"a", 20}], [{"b", 30}]], + _types = ["Tuple(String, Int64)"] + ) + assert %{num_rows: 2} = - Ch.query!( - conn, - "INSERT INTO tuples_t FORMAT RowBinary", - _rows = [[{"a", 20}], [{"b", 30}]], - types: ["Tuple(String, Int64)"] - ) + Ch.query!(conn, ["INSERT INTO tuples_t FORMAT RowBinary\n" | rowbinary]) assert Ch.query!(conn, "SELECT a FROM tuples_t ORDER BY a.1 ASC").rows == [ [{"a", 20}], @@ -825,12 +816,13 @@ defmodule Ch.ConnectionTest do assert {:ok, %{num_rows: 1, rows: [[~D[1900-01-01], "1900-01-01"]]}} = Ch.query(conn, "select {$0:Date32} as d, toString(d)", [~D[1900-01-01]]) - Ch.query!( - conn, - "insert into new(timestamp, event_id) format RowBinary", - _rows = [[~D[1960-01-01], 3]], - types: ["Date32", "UInt8"] - ) + rowbinary = + RowBinary.encode_rows( + _rows = [[~D[1960-01-01], 3]], + _types = ["Date32", "UInt8"] + ) + + Ch.query!(conn, ["insert into new(timestamp, event_id) format RowBinary\n" | rowbinary]) assert %{ num_rows: 3, @@ -885,12 +877,13 @@ defmodule Ch.ConnectionTest do Ch.query!( conn, - "insert into datetime64_t(event_id, timestamp) format RowBinary", - _rows = [ - [4, ~N[2021-01-01 12:00:00.123456]], - [5, ~N[2021-01-01 12:00:00]] - ], - types: ["UInt8", "DateTime64(3)"] + [ + "insert into datetime64_t(event_id, timestamp) format RowBinary\n" + | RowBinary.encode_rows( + _rows = [[4, ~N[2021-01-01 12:00:00.123456]], [5, ~N[2021-01-01 12:00:00]]], + _types = ["UInt8", "DateTime64(3)"] + ) + ] ) assert {:ok, %{num_rows: 2, rows: rows}} = @@ -975,9 +968,7 @@ defmodule Ch.ConnectionTest do assert {:ok, %{num_rows: 5}} = Ch.query( conn, - "insert into nullable format RowBinary", - <<1, 2, 3, 4, 5>>, - encode: false + ["insert into nullable format RowBinary\n" | <<1, 2, 3, 4, 5>>] ) assert %{num_rows: 1, rows: [[count]]} = @@ -996,12 +987,13 @@ defmodule Ch.ConnectionTest do ) ENGINE Memory """) - Ch.query!( - conn, - "INSERT INTO ch_nulls(a, b, c, d) FORMAT RowBinary", - [[nil, nil, nil, nil]], - types: ["UInt8", "Nullable(UInt8)", "UInt8", "Nullable(UInt8)"] - ) + rowbinary = + RowBinary.encode_row( + _row = [nil, nil, nil, nil], + _types = ["UInt8", "Nullable(UInt8)", "UInt8", "Nullable(UInt8)"] + ) + + Ch.query!(conn, ["INSERT INTO ch_nulls(a, b, c, d) FORMAT RowBinary\n" | rowbinary]) # default is ignored... assert Ch.query!(conn, "SELECT * FROM ch_nulls").rows == [[0, nil, 0, nil]] @@ -1016,16 +1008,23 @@ defmodule Ch.ConnectionTest do ) ENGINE Memory """) + rowbinary = + RowBinary.encode_rows( + _rows = [[1, nil], [-1, nil]], + _types = ["Int32", "Nullable(String)"] + ) + Ch.query!( conn, - """ - INSERT INTO test_insert_default_value - SELECT id, name - FROM input('id UInt32, name Nullable(String)') - FORMAT RowBinary\ - """, - [[1, nil], [-1, nil]], - types: ["UInt32", "Nullable(String)"] + [ + """ + INSERT INTO test_insert_default_value + SELECT id, name + FROM input('id UInt32, name Nullable(String)') + FORMAT RowBinary + """ + | rowbinary + ] ) assert Ch.query!(conn, "SELECT * FROM test_insert_default_value ORDER BY n").rows == @@ -1050,7 +1049,10 @@ defmodule Ch.ConnectionTest do test "can insert and select Point", %{conn: conn} do Ch.query!(conn, "CREATE TABLE geo_point (p Point) ENGINE = Memory()") Ch.query!(conn, "INSERT INTO geo_point VALUES((10, 10))") - Ch.query!(conn, "INSERT INTO geo_point FORMAT RowBinary", [[{20, 20}]], types: ["Point"]) + + Ch.query!(conn, [ + "INSERT INTO geo_point FORMAT RowBinary\n" | RowBinary.encode_row([{20, 20}], ["Point"]) + ]) assert Ch.query!(conn, "SELECT p, toTypeName(p) FROM geo_point ORDER BY p ASC").rows == [ [{10.0, 10.0}, "Point"], @@ -1084,7 +1086,10 @@ defmodule Ch.ConnectionTest do Ch.query!(conn, "INSERT INTO geo_ring VALUES([(0, 0), (10, 0), (10, 10), (0, 10)])") ring = [{20, 20}, {0, 0}, {0, 20}] - Ch.query!(conn, "INSERT INTO geo_ring FORMAT RowBinary", [[ring]], types: ["Ring"]) + + Ch.query!(conn, [ + "INSERT INTO geo_ring FORMAT RowBinary\n" | RowBinary.encode_row([ring], ["Ring"]) + ]) assert Ch.query!(conn, "SELECT r, toTypeName(r) FROM geo_ring ORDER BY r ASC").rows == [ [[{0.0, 0.0}, {10.0, 0.0}, {10.0, 10.0}, {0.0, 10.0}], "Ring"], @@ -1125,7 +1130,11 @@ defmodule Ch.ConnectionTest do ) polygon = [[{0, 1.0}, {10, 3.2}], [], [{2, 2}]] - Ch.query!(conn, "INSERT INTO geo_polygon FORMAT RowBinary", [[polygon]], types: ["Polygon"]) + + Ch.query!(conn, [ + "INSERT INTO geo_polygon FORMAT RowBinary\n" + | RowBinary.encode_row([polygon], ["Polygon"]) + ]) assert Ch.query!(conn, "SELECT pg, toTypeName(pg) FROM geo_polygon ORDER BY pg ASC").rows == [ @@ -1183,9 +1192,10 @@ defmodule Ch.ConnectionTest do multipolygon = [[[{0.0, 1.0}, {10.0, 3.0}], [], [{2, 2}]], [], [[{3, 3}]]] - Ch.query!(conn, "INSERT INTO geo_multipolygon FORMAT RowBinary", [[multipolygon]], - types: ["MultiPolygon"] - ) + Ch.query!(conn, [ + "INSERT INTO geo_multipolygon FORMAT RowBinary\n" + | RowBinary.encode_row([multipolygon], ["MultiPolygon"]) + ]) assert Ch.query!(conn, "SELECT mpg, toTypeName(mpg) FROM geo_multipolygon ORDER BY mpg ASC").rows == [ @@ -1362,46 +1372,58 @@ defmodule Ch.ConnectionTest do end test "error on type mismatch", %{conn: conn} do - stmt = "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes" + stmt = "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes\n" rows = [["AB", "rare", -42]] names = ["country_code", "rare_string", "maybe_int32"] - opts = [ - names: names, - types: [Ch.Types.fixed_string(2), Ch.Types.string(), Ch.Types.nullable(Ch.Types.u32())] + bad_types_1 = [ + Ch.Types.fixed_string(2), + Ch.Types.string(), + Ch.Types.nullable(Ch.Types.u32()) ] - assert {:error, %Ch.Error{code: 117, message: message}} = Ch.query(conn, stmt, rows, opts) + assert {:error, %Ch.Error{code: 117, message: message}} = + Ch.query(conn, [ + stmt, + RowBinary.encode_names_and_types(names, bad_types_1) + | RowBinary.encode_rows(rows, bad_types_1) + ]) + assert message =~ "Type of 'rare_string' must be LowCardinality(String), not String" - opts = [ - names: names, - types: [ - Ch.Types.fixed_string(2), - Ch.Types.low_cardinality(Ch.Types.string()), - Ch.Types.nullable(Ch.Types.u32()) - ] + bad_types_2 = [ + Ch.Types.fixed_string(2), + Ch.Types.low_cardinality(Ch.Types.string()), + Ch.Types.nullable(Ch.Types.u32()) ] - assert {:error, %Ch.Error{code: 117, message: message}} = Ch.query(conn, stmt, rows, opts) + assert {:error, %Ch.Error{code: 117, message: message}} = + Ch.query(conn, [ + stmt, + RowBinary.encode_names_and_types(names, bad_types_2) + | RowBinary.encode_rows(rows, bad_types_2) + ]) + assert message =~ "Type of 'maybe_int32' must be Nullable(Int32), not Nullable(UInt32)" end test "ok on valid types", %{conn: conn} do - stmt = "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes" + stmt = "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes\n" rows = [["AB", "rare", -42]] names = ["country_code", "rare_string", "maybe_int32"] - opts = [ - names: names, - types: [ - Ch.Types.fixed_string(2), - Ch.Types.low_cardinality(Ch.Types.string()), - Ch.Types.nullable(Ch.Types.i32()) - ] + types = [ + Ch.Types.fixed_string(2), + Ch.Types.low_cardinality(Ch.Types.string()), + Ch.Types.nullable(Ch.Types.i32()) ] - assert {:ok, %{num_rows: 1}} = Ch.query(conn, stmt, rows, opts) + assert {:ok, %{num_rows: 1}} = + Ch.query(conn, [ + stmt, + RowBinary.encode_names_and_types(names, types) + | RowBinary.encode_rows(rows, types) + ]) end end end diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index 921a177..6354280 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -423,13 +423,22 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.close(mint) spawn_link(fn -> - assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query( - conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - encode: false - ) + err = + catch_error( + Ch.run(conn, fn conn -> + stream + |> Stream.into( + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + |> Stream.run() + end) + ) + + assert {err.__struct__, Exception.message(err)} in [ + {DBConnection.ConnectionError, + "connection is closed because of an error, disconnect or timeout"}, + {Mint.TransportError, "socket closed"} + ] end) # reconnect @@ -440,15 +449,15 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> - assert {:error, %Ch.Error{code: 60, message: message}} = - Ch.query( - conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - encode: false - ) - - assert message =~ ~r/UNKNOWN_TABLE/ + assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn -> + Ch.run(conn, fn conn -> + stream + |> Stream.into( + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + |> Stream.run() + end) + end send(test, :done) end) @@ -482,13 +491,22 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> - assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query( - conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - encode: false - ) + err = + catch_error( + Ch.run(conn, fn conn -> + stream + |> Stream.into( + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + |> Stream.run() + end) + ) + + assert {err.__struct__, Exception.message(err)} in [ + {DBConnection.ConnectionError, + "connection is closed because of an error, disconnect or timeout"}, + {Mint.TransportError, "socket closed"} + ] end) # close after first packet from mint arrives @@ -503,15 +521,15 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> - assert {:error, %Ch.Error{code: 60, message: message}} = - Ch.query( - conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - encode: false - ) - - assert message =~ ~r/UNKNOWN_TABLE/ + assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn -> + Ch.run(conn, fn conn -> + stream + |> Stream.into( + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + |> Stream.run() + end) + end send(test, :done) end) diff --git a/test/ch/stream_test.exs b/test/ch/stream_test.exs index 32bcf43..786be3f 100644 --- a/test/ch/stream_test.exs +++ b/test/ch/stream_test.exs @@ -48,14 +48,7 @@ defmodule Ch.StreamTest do |> Stream.chunk_every(100_000) |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) |> Stream.take(10) - |> Enum.into( - Ch.stream( - conn, - "insert into collect_stream(i) format RowBinary", - _params = [], - encode: false - ) - ) + |> Enum.into(Ch.stream(conn, "insert into collect_stream(i) format RowBinary\n")) end) assert Ch.query!(conn, "select count(*) from collect_stream").rows == [[1_000_000]]