Skip to content

Commit 3348e57

Browse files
joe-mannBrigitte Lamarchejulienschmidtjeffcharlesjmhodges
authored
Implement zlib compression (#1487)
Implemented the SQL compression protocol. This new feature is enabled by: * Adding `compress=true` in DSN. * `cfg.Apply(Compress(True))` Co-authored-by: Brigitte Lamarche <[email protected]> Co-authored-by: Julien Schmidt <[email protected]> Co-authored-by: Jeffrey Charles <[email protected]> Co-authored-by: Jeff Hodges <[email protected]> Co-authored-by: Daniel Montoya <[email protected]> Co-authored-by: Justin Li <[email protected]> Co-authored-by: Dave Stubbs <[email protected]> Co-authored-by: Linh Tran Tuan <[email protected]> Co-authored-by: Robert R. Russell <[email protected]> Co-authored-by: INADA Naoki <[email protected]> Co-authored-by: Kieron Woodhouse <[email protected]> Co-authored-by: Alexey Palazhchenko <[email protected]> Co-authored-by: Reed Allman <[email protected]> Co-authored-by: Joe Mann <[email protected]>
1 parent c9f41c0 commit 3348e57

17 files changed

+581
-109
lines changed

.github/workflows/test.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ jobs:
8383
my-cnf: |
8484
innodb_log_file_size=256MB
8585
innodb_buffer_pool_size=512MB
86-
max_allowed_packet=16MB
86+
max_allowed_packet=48MB
8787
; TestConcurrent fails if max_connections is too large
8888
max_connections=50
8989
local_infile=1

AUTHORS

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ Animesh Ray <mail.rayanimesh at gmail.com>
2121
Arne Hormann <arnehormann at gmail.com>
2222
Ariel Mashraki <ariel at mashraki.co.il>
2323
Asta Xie <xiemengjun at gmail.com>
24+
B Lamarche <blam413 at gmail.com>
2425
Brian Hendriks <brian at dolthub.com>
2526
Bulat Gaifullin <gaifullinbf at gmail.com>
2627
Caine Jette <jette at alum.mit.edu>
@@ -62,6 +63,7 @@ Jennifer Purevsuren <jennifer at dolthub.com>
6263
Jerome Meyer <jxmeyer at gmail.com>
6364
Jiajia Zhong <zhong2plus at gmail.com>
6465
Jian Zhen <zhenjl at gmail.com>
66+
Joe Mann <contact at joemann.co.uk>
6567
Joshua Prunier <joshua.prunier at gmail.com>
6668
Julien Lefevre <julien.lefevr at gmail.com>
6769
Julien Schmidt <go-sql-driver at julienschmidt.com>

README.md

+11
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ A MySQL-Driver for Go's [database/sql](https://golang.org/pkg/database/sql/) pac
3838
* Secure `LOAD DATA LOCAL INFILE` support with file allowlisting and `io.Reader` support
3939
* Optional `time.Time` parsing
4040
* Optional placeholder interpolation
41+
* Supports zlib compression.
4142

4243
## Requirements
4344

@@ -267,6 +268,16 @@ SELECT u.id FROM users as u
267268

268269
will return `u.id` instead of just `id` if `columnsWithAlias=true`.
269270

271+
##### `compress`
272+
273+
```
274+
Type: bool
275+
Valid Values: true, false
276+
Default: false
277+
```
278+
279+
Toggles zlib compression. false by default.
280+
270281
##### `interpolateParams`
271282

272283
```

benchmark_test.go

+20-8
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,13 @@ func (tb *TB) checkStmt(stmt *sql.Stmt, err error) *sql.Stmt {
4646
return stmt
4747
}
4848

49-
func initDB(b *testing.B, queries ...string) *sql.DB {
49+
func initDB(b *testing.B, useCompression bool, queries ...string) *sql.DB {
5050
tb := (*TB)(b)
51-
db := tb.checkDB(sql.Open(driverNameTest, dsn))
51+
comprStr := ""
52+
if useCompression {
53+
comprStr = "&compress=1"
54+
}
55+
db := tb.checkDB(sql.Open(driverNameTest, dsn+comprStr))
5256
for _, query := range queries {
5357
if _, err := db.Exec(query); err != nil {
5458
b.Fatalf("error on %q: %v", query, err)
@@ -60,10 +64,18 @@ func initDB(b *testing.B, queries ...string) *sql.DB {
6064
const concurrencyLevel = 10
6165

6266
func BenchmarkQuery(b *testing.B) {
67+
benchmarkQueryHelper(b, false)
68+
}
69+
70+
func BenchmarkQueryCompression(b *testing.B) {
71+
benchmarkQueryHelper(b, true)
72+
}
73+
74+
func benchmarkQueryHelper(b *testing.B, compr bool) {
6375
tb := (*TB)(b)
6476
b.StopTimer()
6577
b.ReportAllocs()
66-
db := initDB(b,
78+
db := initDB(b, compr,
6779
"DROP TABLE IF EXISTS foo",
6880
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
6981
`INSERT INTO foo VALUES (1, "one")`,
@@ -222,7 +234,7 @@ func BenchmarkInterpolation(b *testing.B) {
222234
},
223235
maxAllowedPacket: maxPacketSize,
224236
maxWriteSize: maxPacketSize - 1,
225-
buf: newBuffer(nil),
237+
buf: newBuffer(),
226238
}
227239

228240
args := []driver.Value{
@@ -269,7 +281,7 @@ func benchmarkQueryContext(b *testing.B, db *sql.DB, p int) {
269281
}
270282

271283
func BenchmarkQueryContext(b *testing.B) {
272-
db := initDB(b,
284+
db := initDB(b, false,
273285
"DROP TABLE IF EXISTS foo",
274286
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
275287
`INSERT INTO foo VALUES (1, "one")`,
@@ -305,7 +317,7 @@ func benchmarkExecContext(b *testing.B, db *sql.DB, p int) {
305317
}
306318

307319
func BenchmarkExecContext(b *testing.B) {
308-
db := initDB(b,
320+
db := initDB(b, false,
309321
"DROP TABLE IF EXISTS foo",
310322
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
311323
`INSERT INTO foo VALUES (1, "one")`,
@@ -323,7 +335,7 @@ func BenchmarkExecContext(b *testing.B) {
323335
// "size=" means size of each blobs.
324336
func BenchmarkQueryRawBytes(b *testing.B) {
325337
var sizes []int = []int{100, 1000, 2000, 4000, 8000, 12000, 16000, 32000, 64000, 256000}
326-
db := initDB(b,
338+
db := initDB(b, false,
327339
"DROP TABLE IF EXISTS bench_rawbytes",
328340
"CREATE TABLE bench_rawbytes (id INT PRIMARY KEY, val LONGBLOB)",
329341
)
@@ -376,7 +388,7 @@ func BenchmarkQueryRawBytes(b *testing.B) {
376388
// BenchmarkReceiveMassiveRows measures performance of receiving large number of rows.
377389
func BenchmarkReceiveMassiveRows(b *testing.B) {
378390
// Setup -- prepare 10000 rows.
379-
db := initDB(b,
391+
db := initDB(b, false,
380392
"DROP TABLE IF EXISTS foo",
381393
"CREATE TABLE foo (id INT PRIMARY KEY, val TEXT)")
382394
defer db.Close()

buffer.go

+10-16
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@ package mysql
1010

1111
import (
1212
"io"
13-
"net"
14-
"time"
1513
)
1614

1715
const defaultBufSize = 4096
1816
const maxCachedBufSize = 256 * 1024
1917

18+
// readerFunc is a function that compatible with io.Reader.
19+
// We use this function type instead of io.Reader because we want to
20+
// just pass mc.readWithTimeout.
21+
type readerFunc func([]byte) (int, error)
22+
2023
// A buffer which is used for both reading and writing.
2124
// This is possible since communication on each connection is synchronous.
2225
// In other words, we can't write and read simultaneously on the same connection.
@@ -25,15 +28,12 @@ const maxCachedBufSize = 256 * 1024
2528
type buffer struct {
2629
buf []byte // read buffer.
2730
cachedBuf []byte // buffer that will be reused. len(cachedBuf) <= maxCachedBufSize.
28-
nc net.Conn
29-
timeout time.Duration
3031
}
3132

3233
// newBuffer allocates and returns a new buffer.
33-
func newBuffer(nc net.Conn) buffer {
34+
func newBuffer() buffer {
3435
return buffer{
3536
cachedBuf: make([]byte, defaultBufSize),
36-
nc: nc,
3737
}
3838
}
3939

@@ -43,7 +43,7 @@ func (b *buffer) busy() bool {
4343
}
4444

4545
// fill reads into the read buffer until at least _need_ bytes are in it.
46-
func (b *buffer) fill(need int) error {
46+
func (b *buffer) fill(need int, r readerFunc) error {
4747
// we'll move the contents of the current buffer to dest before filling it.
4848
dest := b.cachedBuf
4949

@@ -64,13 +64,7 @@ func (b *buffer) fill(need int) error {
6464
copy(dest[:n], b.buf)
6565

6666
for {
67-
if b.timeout > 0 {
68-
if err := b.nc.SetReadDeadline(time.Now().Add(b.timeout)); err != nil {
69-
return err
70-
}
71-
}
72-
73-
nn, err := b.nc.Read(dest[n:])
67+
nn, err := r(dest[n:])
7468
n += nn
7569

7670
if err == nil && n < need {
@@ -92,10 +86,10 @@ func (b *buffer) fill(need int) error {
9286

9387
// returns next N bytes from buffer.
9488
// The returned slice is only guaranteed to be valid until the next read
95-
func (b *buffer) readNext(need int) ([]byte, error) {
89+
func (b *buffer) readNext(need int, r readerFunc) ([]byte, error) {
9690
if len(b.buf) < need {
9791
// refill
98-
if err := b.fill(need); err != nil {
92+
if err := b.fill(need, r); err != nil {
9993
return nil, err
10094
}
10195
}

0 commit comments

Comments
 (0)