diff --git a/database/clickhouse/clickhouse.go b/database/clickhouse/clickhouse.go index d2b65c0ce..bcf2fa422 100644 --- a/database/clickhouse/clickhouse.go +++ b/database/clickhouse/clickhouse.go @@ -134,42 +134,53 @@ func (ch *ClickHouse) init() error { } func (ch *ClickHouse) Run(r io.Reader) error { + fmt.Println(fmt.Sprintf("Try to run clickhouse. MultiStatementEnabled:%+v", ch.config.MultiStatementEnabled)) if ch.config.MultiStatementEnabled { var err error if e := multistmt.Parse(r, multiStmtDelimiter, ch.config.MultiStatementMaxSize, func(m []byte) bool { + fmt.Println(fmt.Sprintf("Try to exec query clickhouse. query:%+v", string(m))) tq := strings.TrimSpace(string(m)) if tq == "" { return true } if _, e := ch.conn.Exec(string(m)); e != nil { + fmt.Println(fmt.Errorf("exec query clickhouse error:%+v", err)) err = database.Error{OrigErr: e, Err: "migration failed", Query: m} return false } return true }); e != nil { + fmt.Println(fmt.Errorf("multistmt parse error:%+v", err)) return e } return err } + fmt.Println(fmt.Sprintf("Try to get clickhouse migration.")) migration, err := io.ReadAll(r) if err != nil { + fmt.Println(fmt.Errorf("get clickhouse migration error:%+v", err)) return err } + fmt.Println(fmt.Sprintf("Try to exec migration clickhouse. migration:%+v", string(migration))) if _, err := ch.conn.Exec(string(migration)); err != nil { + fmt.Println(fmt.Errorf("exec migration clickhouse error:%+v", err)) return database.Error{OrigErr: err, Err: "migration failed", Query: migration} } return nil } func (ch *ClickHouse) Version() (int, bool, error) { + var ( version int dirty uint8 query = "SELECT version, dirty FROM `" + ch.config.MigrationsTable + "` ORDER BY sequence DESC LIMIT 1" ) + fmt.Println(fmt.Sprintf("Try to get version clickhouse. query:%+v", query)) if err := ch.conn.QueryRow(query).Scan(&version, &dirty); err != nil { + fmt.Println(fmt.Errorf("get version clickhouse err:%+v", err)) if err == sql.ErrNoRows { return database.NilVersion, false, nil } @@ -179,6 +190,7 @@ func (ch *ClickHouse) Version() (int, bool, error) { } func (ch *ClickHouse) SetVersion(version int, dirty bool) error { + fmt.Println(fmt.Sprintf("Try to set version clickhouse. version:%+v dirty:%+v", version, dirty)) var ( bool = func(v bool) uint8 { if v { @@ -189,11 +201,14 @@ func (ch *ClickHouse) SetVersion(version int, dirty bool) error { tx, err = ch.conn.Begin() ) if err != nil { + fmt.Println(fmt.Errorf("set version clickhouse begin error:%+v", err)) return err } query := "INSERT INTO " + ch.config.MigrationsTable + " (version, dirty, sequence) VALUES (?, ?, ?)" + fmt.Println(fmt.Sprintf("Try to set version clickhouse exec. query:%+v", query)) if _, err := tx.Exec(query, version, bool(dirty), time.Now().UnixNano()); err != nil { + fmt.Println(fmt.Errorf("set version clickhouse exec error:%+v", err)) return &database.Error{OrigErr: err, Query: []byte(query)} } diff --git a/migrate.go b/migrate.go index 44efe14e3..e81025abc 100644 --- a/migrate.go +++ b/migrate.go @@ -273,6 +273,7 @@ func (m *Migrate) Up() error { if err != nil { return m.unlockErr(err) } + fmt.Println(fmt.Sprintf("Try to do up. curVersion:%+v dirty:%+v", curVersion, dirty)) if dirty { return m.unlockErr(ErrDirty{curVersion}) @@ -534,9 +535,11 @@ func (m *Migrate) read(from int, to int, ret chan<- interface{}) { func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) { defer close(ret) + fmt.Println(fmt.Sprintf("Try to read up. from:%+v limit:%+v", from, limit)) // check if from version exists if from >= 0 { if err := m.versionExists(suint(from)); err != nil { + fmt.Println(fmt.Errorf("version exists error:%+v", err)) ret <- err return } @@ -549,20 +552,26 @@ func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) { count := 0 for count < limit || limit == -1 { + fmt.Println(fmt.Sprintf("Try to loop read up. count:%+v from:%+v", count, from)) if m.stop() { + fmt.Println(fmt.Sprintf("Exit loop read up.")) return } // apply first migration if from is nil version if from == -1 { + fmt.Println(fmt.Sprintf("Try to get first version.")) firstVersion, err := m.sourceDrv.First() if err != nil { + fmt.Println(fmt.Errorf("get first version error:%+v", err)) ret <- err return } + fmt.Println(fmt.Sprintf("Try to new first migration. firstVersion: %+v", firstVersion)) migr, err := m.newMigration(firstVersion, int(firstVersion)) if err != nil { + fmt.Println(fmt.Errorf("new first migration error:%+v", err)) ret <- err return } @@ -570,6 +579,7 @@ func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) { ret <- migr go func() { if err := migr.Buffer(); err != nil { + fmt.Println(fmt.Errorf("buffer migr error:%+v", err)) m.logErr(err) } }() @@ -578,6 +588,7 @@ func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) { continue } + fmt.Println(fmt.Sprintf("Try to get next version.")) // apply next migration next, err := m.sourceDrv.Next(suint(from)) if errors.Is(err, os.ErrNotExist) { @@ -605,12 +616,15 @@ func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) { } } if err != nil { + fmt.Println(fmt.Errorf("get next version error:%+v", err)) ret <- err return } + fmt.Println(fmt.Sprintf("Try to new next migration. nextVersion: %+v", next)) migr, err := m.newMigration(next, int(next)) if err != nil { + fmt.Println(fmt.Errorf("new next migration error:%+v", err)) ret <- err return } @@ -618,6 +632,7 @@ func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) { ret <- migr go func() { if err := migr.Buffer(); err != nil { + fmt.Println(fmt.Errorf("buffer migr error:%+v", err)) m.logErr(err) } }() @@ -723,39 +738,48 @@ func (m *Migrate) readDown(from int, limit int, ret chan<- interface{}) { // to stop execution because it might have received a stop signal on the // GracefulStop channel. func (m *Migrate) runMigrations(ret <-chan interface{}) error { + fmt.Println(fmt.Sprintf("Try to run migrations.")) for r := range ret { - if m.stop() { + fmt.Println(fmt.Sprintf("Exit loop run migration.")) return nil } switch r := r.(type) { case error: + fmt.Println(fmt.Errorf("run migration error:%+v", r)) return r case *Migration: migr := r + fmt.Println(fmt.Sprintf("Try to set dirty version. TargetVersion:%+v", migr.TargetVersion)) // set version with dirty state if err := m.databaseDrv.SetVersion(migr.TargetVersion, true); err != nil { + fmt.Println(fmt.Errorf("set dirty version error:%+v", err)) return err } if migr.Body != nil { + fmt.Println(fmt.Sprintf("Try to loop run migrations. LogString:%+v", migr.LogString())) m.logVerbosePrintf("Read and execute %v\n", migr.LogString()) if err := m.databaseDrv.Run(migr.BufferedBody); err != nil { + fmt.Println(fmt.Errorf("run error:%+v", err)) return err } } + fmt.Println(fmt.Sprintf("Try to set clenn version. TargetVersion:%+v", migr.TargetVersion)) // set clean state if err := m.databaseDrv.SetVersion(migr.TargetVersion, false); err != nil { + fmt.Println(fmt.Errorf("set clean version error:%+v", err)) return err } endTime := time.Now() readTime := migr.FinishedReading.Sub(migr.StartedBuffering) runTime := endTime.Sub(migr.FinishedReading) + fmt.Println(fmt.Sprintf("Finished %+v (read %+v, ran %+v)", migr.LogString(), readTime, runTime)) // log either verbose or normal if m.Log != nil {