Skip to content

Commit 630eb84

Browse files
committed
Merge branch '282-compressed-dumps' into 'master'
feat: support compressed plain-text dumps (#282) Closes #282 See merge request postgres-ai/database-lab!314
2 parents dd260ab + da3b3f2 commit 630eb84

File tree

7 files changed

+417
-151
lines changed

7 files changed

+417
-151
lines changed

configs/config.example.logical_generic.yml

+2
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ retrieval:
244244
# database1:
245245
# # Dump format. Available formats: directory, custom, plain. Default format: directory.
246246
# format: directory
247+
# # Compression (only for plain-text dumps): "gzip", "bzip2", or "no". Default: "no".
248+
# compression: no
247249
# # Option for a partial restore. Do not specify the tables section to restore all available tables.
248250
# tables:
249251
# - table1

configs/config.example.logical_rds_iam.yml

+2
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ retrieval:
252252
# database1:
253253
# # Dump format. Available formats: directory, custom, plain. Default format: directory.
254254
# format: directory
255+
# # Compression (only for plain-text dumps): "gzip", "bzip2", or "no". Default: "no".
256+
# compression: no
255257
# # Option for a partial restore. Do not specify the tables section to restore all available tables.
256258
# tables:
257259
# - table1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
2021 © Postgres.ai
3+
*/
4+
5+
package logical
6+
7+
import (
8+
"path/filepath"
9+
)
10+
11+
type compressionType string
12+
13+
const (
14+
noCompression compressionType = "no"
15+
gzipCompression compressionType = "gzip"
16+
bzip2Compression compressionType = "bzip2"
17+
)
18+
19+
// getReadingArchiveCommand chooses command to read dump file.
20+
func getReadingArchiveCommand(compressionType compressionType) string {
21+
switch compressionType {
22+
case gzipCompression:
23+
return "gunzip -c"
24+
25+
case bzip2Compression:
26+
return "bunzip2 -c"
27+
28+
default:
29+
return "cat"
30+
}
31+
}
32+
33+
// getCompressionType returns archive type based on filename extension.
34+
func getCompressionType(filename string) compressionType {
35+
switch filepath.Ext(filename) {
36+
case ".gz":
37+
return gzipCompression
38+
39+
case ".bz2":
40+
return bzip2Compression
41+
42+
default:
43+
return noCompression
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
2021 © Postgres.ai
3+
*/
4+
5+
package logical
6+
7+
import (
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func TestReadingArchiveCommand(t *testing.T) {
14+
testCases := []struct {
15+
compressionType compressionType
16+
expectedCommand string
17+
}{
18+
{
19+
compressionType: gzipCompression,
20+
expectedCommand: "gunzip -c",
21+
},
22+
{
23+
compressionType: bzip2Compression,
24+
expectedCommand: "bunzip2 -c",
25+
},
26+
{
27+
compressionType: noCompression,
28+
expectedCommand: "cat",
29+
},
30+
{
31+
compressionType: compressionType(""),
32+
expectedCommand: "cat",
33+
},
34+
}
35+
36+
for _, tc := range testCases {
37+
command := getReadingArchiveCommand(tc.compressionType)
38+
assert.Equal(t, tc.expectedCommand, command)
39+
}
40+
}
41+
42+
func TestArchiveType(t *testing.T) {
43+
testCases := []struct {
44+
filename string
45+
expectedCompressionType compressionType
46+
}{
47+
{
48+
filename: "dump.sql.gz",
49+
expectedCompressionType: gzipCompression,
50+
},
51+
{
52+
filename: "dump.bz2",
53+
expectedCompressionType: bzip2Compression,
54+
},
55+
{
56+
filename: "test.dmp",
57+
expectedCompressionType: noCompression,
58+
},
59+
{
60+
filename: "dump",
61+
expectedCompressionType: noCompression,
62+
},
63+
}
64+
65+
for _, tc := range testCases {
66+
compressionType := getCompressionType(tc.filename)
67+
assert.Equal(t, tc.expectedCompressionType, compressionType)
68+
}
69+
}

pkg/retrieval/engine/postgres/logical/dump.go

+21-20
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,14 @@ type DumpJob struct {
7878

7979
// DumpOptions defines a logical dump options.
8080
type DumpOptions struct {
81-
DumpLocation string `yaml:"dumpLocation"`
82-
DockerImage string `yaml:"dockerImage"`
83-
ContainerConfig map[string]interface{} `yaml:"containerConfig"`
84-
Connection Connection `yaml:"connection"`
85-
Source Source `yaml:"source"`
86-
Databases map[string]DBDefinition `yaml:"databases"`
87-
ParallelJobs int `yaml:"parallelJobs"`
88-
Restore ImmediateRestore `yaml:"immediateRestore"`
81+
DumpLocation string `yaml:"dumpLocation"`
82+
DockerImage string `yaml:"dockerImage"`
83+
ContainerConfig map[string]interface{} `yaml:"containerConfig"`
84+
Connection Connection `yaml:"connection"`
85+
Source Source `yaml:"source"`
86+
Databases map[string]DumpDefinition `yaml:"databases"`
87+
ParallelJobs int `yaml:"parallelJobs"`
88+
Restore ImmediateRestore `yaml:"immediateRestore"`
8989
}
9090

9191
// Source describes source of data to dump.
@@ -95,11 +95,12 @@ type Source struct {
9595
RDS *RDSConfig `yaml:"rdsIam"`
9696
}
9797

98-
// DBDefinition describes a database for dumping.
99-
type DBDefinition struct {
100-
Tables []string `yaml:"tables"`
101-
Format string `yaml:"format"`
102-
dbName string
98+
// DumpDefinition describes a database for dumping.
99+
type DumpDefinition struct {
100+
Tables []string `yaml:"tables"`
101+
Format string `yaml:"format"`
102+
Compression compressionType `yaml:"compression"`
103+
dbName string
103104
}
104105

105106
type dumpJobConfig struct {
@@ -366,8 +367,8 @@ func (d *DumpJob) Run(ctx context.Context) (err error) {
366367
return nil
367368
}
368369

369-
func (d *DumpJob) getDBList(ctx context.Context) (map[string]DBDefinition, error) {
370-
dbList := make(map[string]DBDefinition)
370+
func (d *DumpJob) getDBList(ctx context.Context) (map[string]DumpDefinition, error) {
371+
dbList := make(map[string]DumpDefinition)
371372

372373
connStr := db.ConnectionString(d.config.db.Host, strconv.Itoa(d.config.db.Port), d.config.db.Username, d.config.db.DBName, d.getPassword())
373374

@@ -387,7 +388,7 @@ func (d *DumpJob) getDBList(ctx context.Context) (map[string]DBDefinition, error
387388
return nil, errors.Wrap(err, "failed to scan next row in database list result set")
388389
}
389390

390-
dbList[dbName] = DBDefinition{}
391+
dbList[dbName] = DumpDefinition{}
391392
}
392393

393394
return dbList, nil
@@ -420,12 +421,12 @@ func (d *DumpJob) cleanupDumpLocation(ctx context.Context, dumpContID string) er
420421
return nil
421422
}
422423

423-
func (d *DumpJob) dumpDatabase(ctx context.Context, dumpContID, dbName string, dbDefinition DBDefinition) error {
424-
dumpCommand := d.buildLogicalDumpCommand(dbName, dbDefinition.Tables)
424+
func (d *DumpJob) dumpDatabase(ctx context.Context, dumpContID, dbName string, dumpDefinition DumpDefinition) error {
425+
dumpCommand := d.buildLogicalDumpCommand(dbName, dumpDefinition.Tables)
425426
log.Msg("Running dump command: ", dumpCommand)
426427

427-
if len(dbDefinition.Tables) > 0 {
428-
log.Msg("Partial dump will be run. Tables for dumping: ", strings.Join(dbDefinition.Tables, ", "))
428+
if len(dumpDefinition.Tables) > 0 {
429+
log.Msg("Partial dump will be run. Tables for dumping: ", strings.Join(dumpDefinition.Tables, ", "))
429430
}
430431

431432
if output, err := d.performDumpCommand(ctx, dumpContID, types.ExecConfig{

0 commit comments

Comments
 (0)