diff --git a/database/clickhouse/README.md b/database/clickhouse/README.md index 9357efcfd..378c751b7 100644 --- a/database/clickhouse/README.md +++ b/database/clickhouse/README.md @@ -23,4 +23,5 @@ * Clickhouse cluster mode is not officially supported, since it's not tested right now, but you can try enabling `schema_migrations` table replication by specifying a `x-cluster-name`: * When `x-cluster-name` is specified, `x-migrations-table-engine` also should be specified. See the docs regarding [replicated table engines](https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replication/#table_engines-replication). * When `x-cluster-name` is specified, only the `schema_migrations` table is replicated across the cluster. You still need to write your migrations so that the application tables are replicated within the cluster. + * When `x-cluster-name` is specified and `x-distributed` is set to "true", a distributed table engine is used allowing you to run commands from any host in the cluster. Note this only works in cluster mode and an `x-cluster-name` must be specified. * If you want to create database inside the migration, you should know, that table which will manage migrations `schema-migrations table` will be in `default` table, so you can't use `USE ` inside migration. In this case you may not specify the database in the connection string (example you can find [here](examples/migrations/003_create_database.up.sql)) diff --git a/database/clickhouse/clickhouse.go b/database/clickhouse/clickhouse.go index d2b65c0ce..9a06cf32e 100644 --- a/database/clickhouse/clickhouse.go +++ b/database/clickhouse/clickhouse.go @@ -30,6 +30,7 @@ var ( type Config struct { DatabaseName string ClusterName string + IsDistributed bool MigrationsTable string MigrationsTableEngine string MultiStatementEnabled bool @@ -99,6 +100,7 @@ func (ch *ClickHouse) Open(dsn string) (database.Driver, error) { MigrationsTableEngine: migrationsTableEngine, DatabaseName: purl.Query().Get("database"), ClusterName: purl.Query().Get("x-cluster-name"), + IsDistributed: purl.Query().Get("x-distributed") == "true", MultiStatementEnabled: purl.Query().Get("x-multi-statement") == "true", MultiStatementMaxSize: multiStatementMaxSize, }, @@ -130,7 +132,14 @@ func (ch *ClickHouse) init() error { ch.config.MigrationsTableEngine = DefaultMigrationsTableEngine } - return ch.ensureVersionTable() + if err := ch.ensureVersionTable(); err != nil { + return err + } + if ch.config.IsDistributed { + ch.config.MigrationsTableEngine = "ReplicatedMergeTree" // base table must be replicated to supported a distributed table + return ch.ensureDistributedTable() + } + return nil } func (ch *ClickHouse) Run(r io.Reader) error { @@ -192,7 +201,11 @@ func (ch *ClickHouse) SetVersion(version int, dirty bool) error { return err } - query := "INSERT INTO " + ch.config.MigrationsTable + " (version, dirty, sequence) VALUES (?, ?, ?)" + query := "INSERT INTO " + ch.config.DatabaseName + "." + ch.config.MigrationsTable + " (version, dirty, sequence) " + if ch.config.IsDistributed { + query = query + "SETTINGS distributed_foreground_insert = 1 " + } + query = query + "VALUES (?, ?, ?)" if _, err := tx.Exec(query, version, bool(dirty), time.Now().UnixNano()); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } @@ -233,19 +246,30 @@ func (ch *ClickHouse) ensureVersionTable() (err error) { // if not, create the empty migration table if len(ch.config.ClusterName) > 0 { - query = fmt.Sprintf(` - CREATE TABLE %s ON CLUSTER %s ( + + if ch.config.IsDistributed { // we will rename the underlying table + baseTableName := ch.config.MigrationsTable + "_local" + query = fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s ( + version Int64, + dirty UInt8, + sequence UInt64 + ) Engine=%s Primary Key (sequence)`, ch.config.DatabaseName, baseTableName, ch.config.ClusterName, ch.config.MigrationsTableEngine) + } else { // cluster mode without the distributed table + query = fmt.Sprintf(` + CREATE TABLE %s.%s ON CLUSTER %s ( version Int64, dirty UInt8, sequence UInt64 - ) Engine=%s`, ch.config.MigrationsTable, ch.config.ClusterName, ch.config.MigrationsTableEngine) + ) Engine=%s`, ch.config.DatabaseName, ch.config.MigrationsTable, ch.config.ClusterName, ch.config.MigrationsTableEngine) + } } else { query = fmt.Sprintf(` - CREATE TABLE %s ( + CREATE TABLE %s.%s ( version Int64, dirty UInt8, sequence UInt64 - ) Engine=%s`, ch.config.MigrationsTable, ch.config.MigrationsTableEngine) + ) Engine=%s`, ch.config.DatabaseName, ch.config.MigrationsTable, ch.config.MigrationsTableEngine) } if strings.HasSuffix(ch.config.MigrationsTableEngine, "Tree") { @@ -258,6 +282,17 @@ func (ch *ClickHouse) ensureVersionTable() (err error) { return nil } +func (ch *ClickHouse) ensureDistributedTable() error { + baseTableName := ch.config.MigrationsTable + "_local" + query := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s Engine=Distributed(%s, %s, %s, sequence) SETTINGS fsync_after_insert = 1`, ch.config.DatabaseName, ch.config.MigrationsTable, ch.config.ClusterName, ch.config.DatabaseName, baseTableName, ch.config.ClusterName, ch.config.DatabaseName, baseTableName) + + if _, err := ch.conn.Exec(query); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + return nil +} + func (ch *ClickHouse) Drop() (err error) { query := "SHOW TABLES FROM " + quoteIdentifier(ch.config.DatabaseName) tables, err := ch.conn.Query(query) @@ -279,6 +314,10 @@ func (ch *ClickHouse) Drop() (err error) { query = "DROP TABLE IF EXISTS " + quoteIdentifier(ch.config.DatabaseName) + "." + quoteIdentifier(table) + if len(ch.config.ClusterName) > 0 { + query = query + " ON CLUSTER " + ch.config.ClusterName + } + if _, err := ch.conn.Exec(query); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} }