Skip to content

Commit 5cef8f0

Browse files
committed
add transaction to sync mode
1 parent cbb2ade commit 5cef8f0

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

repository/repository_pg.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
// postgres driver
7+
"github.com/jackc/pgx/v4"
78
"github.com/jackc/pgx/v4/pgxpool"
89
log "github.com/sirupsen/logrus"
910
)
@@ -13,9 +14,10 @@ type Postgres struct {
1314
pool *pgxpool.Pool
1415
}
1516

16-
// PostgresConn postgres conn repo
17+
// PostgresConn postgres conn repo and a tx transaction
1718
type PostgresConn struct {
1819
Conn *pgxpool.Conn
20+
Tx pgx.Tx
1921
}
2022

2123
// NewPostgres return a new postgres repository

syncer/repository_pg.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (s *Service) getTableColumns(ctx context.Context, sourceConn *repository.Po
5050
// truncateTable
5151
func (s *Service) truncateTable(ctx context.Context, conn *repository.PostgresConn, schema, table string) (err error) {
5252
query := fmt.Sprintf("truncate table %s.%s", schema, table)
53-
_, err = conn.Conn.Exec(ctx, query)
53+
_, err = conn.Tx.Exec(ctx, query)
5454
return
5555
}
5656

@@ -92,7 +92,7 @@ func (s *Service) copyFromSelect(ctx context.Context, sourceConn *repository.Pos
9292

9393
destinationIdentifier := pgx.Identifier{s.Access.DestinationSchema, s.Access.DestinationTable}
9494

95-
copyCount, err := destinationConn.Conn.CopyFrom(ctx, destinationIdentifier, sourceColumns, rows)
95+
copyCount, err := destinationConn.Tx.CopyFrom(ctx, destinationIdentifier, sourceColumns, rows)
9696
if err != nil {
9797
log.Errorf("service.copyFromSelect(): sourceConn.Conn.CopyFrom() error=%w", err)
9898
return

syncer/service.go

+16
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ func (s *Service) Run(ctx context.Context) error {
4646
switch s.Access.SyncMode {
4747
case FullSync:
4848
log.Debugf("sync_mode selected: %s", FullSync)
49+
50+
// open the transaction to this mode
51+
destinationConn.Tx, err = destinationConn.Conn.Conn().Begin(ctx)
52+
if err != nil {
53+
log.Errorf("service.Run(): destinationConn.Conn.Conn().Begin() error=%w", err)
54+
return err
55+
}
56+
defer destinationConn.Tx.Rollback(ctx)
57+
4958
err = s.truncateTable(ctx, destinationConn, s.Access.DestinationSchema, s.Access.DestinationTable)
5059
if err != nil {
5160
log.Errorf("service.Run(): s.truncateTable() error=%w", err)
@@ -57,6 +66,13 @@ func (s *Service) Run(ctx context.Context) error {
5766
log.Errorf("service.Run(): s.copyFromSelect() error=%w", err)
5867
return err
5968
}
69+
70+
err = destinationConn.Tx.Commit(ctx)
71+
if err != nil {
72+
log.Errorf("service.Run(): destinationConn.Tx.Commit() error=%w", err)
73+
return err
74+
}
75+
6076
log.Debugf("finish the job, sync_mode selected: %s", FullSync)
6177

6278
default:

0 commit comments

Comments
 (0)