Skip to content

Commit

Permalink
Properly quote and escape identifiers in dumper query (#74)
Browse files Browse the repository at this point in the history
This fixes the generated query in case any of the identifiers contains some
double quote, which could be used to do an SQL injection.

The tests are updated to use a schema name that requires quoting and a relation
name that requires escaping too.

Co-authored-by: Rueian <[email protected]>
  • Loading branch information
rjuju and rueian authored Oct 19, 2024
1 parent 4d0e60a commit 1a64198
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
10 changes: 8 additions & 2 deletions pkg/dblog/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func (p *PGXSourceDumper) Stop() {
// Note that we have to use the upper bound as is (and therefore add knowledge
// about the maximum offset number) rather than use (block_number + 1, 0), in
// the unlikely event that we were provided the maximum block number
const DumpQuery = `select * from "%s"."%s" where ctid >= ($1::bigint, 0)::text::tid AND ctid <= ($2::bigint, 65535)::text::tid`
// Note also that the caller is responsible for providing a properly quoted and
// fully qualified relation name.
const DumpQuery = `SELECT * FROM %s WHERE ctid >= ($1::bigint, 0)::text::tid AND ctid <= ($2::bigint, 65535)::text::tid`

func (p *PGXSourceDumper) load(minLSN uint64, info *pb.DumpInfoResponse) ([]*pb.Change, error) {
ctx := context.Background()
Expand All @@ -134,7 +136,11 @@ func (p *PGXSourceDumper) load(minLSN uint64, info *pb.DumpInfoResponse) ([]*pb.
}
}

rows, err := tx.Query(ctx, fmt.Sprintf(DumpQuery, info.Schema, info.Table), info.PageBegin, info.PageEnd)
// Properly quote and escape the provided identifiers
var identifiers pgx.Identifier = []string{info.Schema, info.Table};
relation := identifiers.Sanitize()

rows, err := tx.Query(ctx, fmt.Sprintf(DumpQuery, relation), info.PageBegin, info.PageEnd)
if err != nil {
var pge *pgconn.PgError
if errors.As(err, &pge) && pge.Code == "42P01" {
Expand Down
25 changes: 14 additions & 11 deletions pkg/dblog/dumper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ func TestPGXSourceDumper(t *testing.T) {
}
defer conn.Close(ctx)

conn.Exec(ctx, "DROP SCHEMA public CASCADE; CREATE SCHEMA public")
// We explicitly use a schema name that requires quoting, and a table name
// that also requires escaping to check that the dumper code properly
// handle both of those.
conn.Exec(ctx, `DROP SCHEMA IF EXISTS "Public" CASCADE; CREATE SCHEMA "Public"`)
conn.Exec(ctx, "DROP EXTENSION IF EXISTS pgcapture")
conn.Exec(ctx, sql.InstallExtension)
conn.Exec(ctx, "CREATE TABLE t1 AS SELECT * FROM generate_series(1,100000) AS id; ANALYZE t1")
conn.Exec(ctx, `CREATE TABLE "Public"."T""1" AS SELECT * FROM generate_series(1,100000) AS id; ANALYZE "Public"."T""1"`)

dumper, err := NewPGXSourceDumper(ctx, postgresURL)
if err != nil {
Expand All @@ -36,41 +39,41 @@ func TestPGXSourceDumper(t *testing.T) {
if _, err := dumper.LoadDump(0, &pb.DumpInfoResponse{}); !errors.Is(err, ErrMissingTable) {
t.Fatal(err)
}
if _, err := dumper.LoadDump(0, &pb.DumpInfoResponse{Schema: "public", Table: "t1"}); !errors.Is(err, ErrLSNMissing) {
if _, err := dumper.LoadDump(0, &pb.DumpInfoResponse{Schema: "Public", Table: `T"1`}); !errors.Is(err, ErrLSNMissing) {
t.Fatal(err)
}

conn.Exec(ctx, "INSERT INTO pgcapture.sources (id,commit) VALUES ($1,$2)", "t1", pglogrepl.LSN(0).String())
conn.Exec(ctx, "INSERT INTO pgcapture.sources (id,commit) VALUES ($1,$2)", `Public.T"1`, pglogrepl.LSN(0).String())

if _, err := dumper.LoadDump(0, &pb.DumpInfoResponse{Schema: "any", Table: "any"}); !errors.Is(err, ErrMissingTable) {
t.Fatal(err)
}
if _, err := dumper.LoadDump(0, &pb.DumpInfoResponse{Schema: "public", Table: "any"}); !errors.Is(err, ErrMissingTable) {
if _, err := dumper.LoadDump(0, &pb.DumpInfoResponse{Schema: "Public", Table: "any"}); !errors.Is(err, ErrMissingTable) {
t.Fatal(err)
}

if _, err := dumper.LoadDump(100, &pb.DumpInfoResponse{Schema: "public", Table: "t1"}); !errors.Is(err, ErrLSNFallBehind) {
if _, err := dumper.LoadDump(100, &pb.DumpInfoResponse{Schema: "Public", Table: `T"1`}); !errors.Is(err, ErrLSNFallBehind) {
t.Fatal(err)
}

conn.Exec(ctx, "UPDATE pgcapture.sources SET commit=$2 WHERE id = $1", "t1", pglogrepl.LSN(100).String())
conn.Exec(ctx, "UPDATE pgcapture.sources SET commit=$2 WHERE id = $1", `Public.T"1`, pglogrepl.LSN(100).String())

var pages int
if err := conn.QueryRow(ctx, "select relpages from pg_class where relname = 't1'").Scan(&pages); err != nil || pages == 0 {
if err := conn.QueryRow(ctx, `SELECT relpages FROM pg_class WHERE relname = 'T"1' AND relnamespace::regnamespace::text = '"Public"'`).Scan(&pages); err != nil || pages == 0 {
t.Fatal(err)
}

seq := int32(1)
for i := uint32(0); i < uint32(pages); i += 5 {
changes, err := dumper.LoadDump(100, &pb.DumpInfoResponse{Schema: "public", Table: "t1", PageBegin: i, PageEnd: i + 4})
changes, err := dumper.LoadDump(100, &pb.DumpInfoResponse{Schema: "Public", Table: `T"1`, PageBegin: i, PageEnd: i + 4})
if err != nil {
t.Fatal(err)
}
for _, change := range changes {
if change.Schema != "public" {
if change.Schema != "Public" {
t.Fatal("unexpected")
}
if change.Table != "t1" {
if change.Table != `T"1` {
t.Fatal("unexpected")
}
if change.Op != pb.Change_UPDATE {
Expand Down

0 comments on commit 1a64198

Please sign in to comment.