Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go.mod: bump github.com/conduitio/conduit-connector-sdk from 0.12.0 to 0.13.1 #151

Merged
5 changes: 4 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: release
on:
push:
tags:
- v*
- '*'

permissions:
contents: write
Expand All @@ -18,6 +18,9 @@ jobs:
with:
fetch-depth: 0

- name: Check Connector Tag
uses: conduitio/automation/actions/check_connector_tag@main

- name: Set up Go
uses: actions/setup-go@v5
with:
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ linters:
- stylecheck
- sqlclosecheck
- tagliatelle
- tenv
- thelper
- tparallel
- typecheck
- unconvert
- unparam
- unused
- usetesting
- wastedassign
- whitespace
- wrapcheck
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ lint:
.PHONY: generate
generate:
go generate ./...
conn-sdk-cli readmegen -w

.PHONY: install-tools
install-tools:
Expand Down
294 changes: 236 additions & 58 deletions README.md

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/brianvoe/gofakeit"
"github.com/conduitio-labs/conduit-connector-mongo/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/matryer/is"
Expand Down Expand Up @@ -74,8 +73,8 @@ func TestAcceptance(t *testing.T) {
}

cfg := map[string]string{
config.KeyURI: uri,
config.KeyDB: testDB,
"uri": uri,
"db": testDB,
}

sdk.AcceptanceTest(t, driver{
Expand All @@ -99,18 +98,18 @@ func beforeTest(cfg map[string]string) func(*testing.T) {
is := is.New(t)

// create a test mongo client
mongoClient, err := createTestMongoClient(context.Background(), cfg[config.KeyURI])
mongoClient, err := createTestMongoClient(context.Background(), cfg["uri"])
is.NoErr(err)
defer func() {
err = mongoClient.Disconnect(context.Background())
is.NoErr(err)
}()

cfg[config.KeyCollection] = fmt.Sprintf("%s_%d", testCollectionPrefix, time.Now().UnixNano())
cfg["collection"] = fmt.Sprintf("%s_%d", testCollectionPrefix, time.Now().UnixNano())

// connect to the test database and create a collection
testDatabase := mongoClient.Database(cfg[config.KeyDB])
is.NoErr(testDatabase.CreateCollection(context.Background(), cfg[config.KeyCollection]))
testDatabase := mongoClient.Database(cfg["db"])
is.NoErr(testDatabase.CreateCollection(context.Background(), cfg["collection"]))
}
}

Expand All @@ -122,16 +121,16 @@ func afterTest(cfg map[string]string) func(*testing.T) {
is := is.New(t)

// create a test mongo client
mongoClient, err := createTestMongoClient(context.Background(), cfg[config.KeyURI])
mongoClient, err := createTestMongoClient(context.Background(), cfg["uri"])
is.NoErr(err)
defer func() {
err = mongoClient.Disconnect(context.Background())
is.NoErr(err)
}()

// connect to the test database and collection
testDatabase := mongoClient.Database(cfg[config.KeyDB])
testCollection := testDatabase.Collection(cfg[config.KeyCollection])
testDatabase := mongoClient.Database(cfg["db"])
testCollection := testDatabase.Collection(cfg["collection"])

// drop the test collection
err = testCollection.Drop(context.Background())
Expand Down
152 changes: 67 additions & 85 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,18 @@
package config

import (
"context"
"errors"
"fmt"
"net/url"
"os"
"strings"
"time"

"github.com/conduitio-labs/conduit-connector-mongo/validator"
"go.mongodb.org/mongo-driver/mongo/options"
)

// defaultConnectionURI is a default MongoDB connection URI string.
var defaultConnectionURI = &url.URL{Scheme: "mongodb", Host: "localhost:27017"}

const (
// KeyURI is a config name for a connection string.
KeyURI = "uri"
// KeyDB is a config name for a database.
KeyDB = "db"
// KeyCollection is a config name for a collection.
KeyCollection = "collection"
// KeyAuthUsername is a config name for a username.
KeyAuthUsername = "auth.username"
// KeyAuthPassword is a config name for a password.
KeyAuthPassword = "auth.password"
// KeyAuthDB is a config name for an authentication database.
KeyAuthDB = "auth.db"
// KeyAuthMechanism is a config name for an authentication mechanism.
KeyAuthMechanism = "auth.mechanism"
// KeyAuthTLSCAFile is a config name for a TLS CA file.
KeyAuthTLSCAFile = "auth.tls.caFile"
// KeyAuthTLSCertificateKeyFile is a config name for a TLS certificate key file.
KeyAuthTLSCertificateKeyFile = "auth.tls.certificateKeyFile"
// KeyAuthAWSSessionToken is a config name for an AWS session token.
KeyAuthAWSSessionToken = "auth.awsSessionToken" //nolint:gosec // it's not hardcoded credential

// defaultServerSelectionTimeout is a default value for the ServerSelectionTimeout option.
defaultServerSelectionTimeout = time.Second * 5

Expand Down Expand Up @@ -90,114 +68,105 @@ func (am AuthMechanism) IsValid() bool {
type Config struct {
// URI is the connection string.
// The URI can contain host names, IPv4/IPv6 literals, or an SRV record.
URI *url.URL
URIStr string `json:"uri" default:"mongodb://localhost:27017"`
uri *url.URL

// DB is the name of a database the connector must work with.
DB string `key:"db" validate:"required,max=64"`
DB string `json:"db" validate:"required"`
// Collection is the name of a collection the connector must
// write to (destination) or read from (source).
Collection string `key:"collection" validate:"required"`
Collection string `json:"collection" validate:"required"`

Auth AuthConfig
Auth AuthConfig `json:"auth"`
}

// AuthConfig contains authentication-specific configurable values.
type AuthConfig struct {
// Username is the username.
Username string `key:"auth.username"`
Username string `json:"username"`
// Password is the user's password.
Password string `key:"auth.password"`
Password string `json:"password"`
// DB is the name of a database that contains
// the user's authentication data.
DB string `key:"auth.db"`
DB string `json:"db"`
// Mechanism is the authentication mechanism.
Mechanism AuthMechanism `key:"auth.mechanism"`
Mechanism AuthMechanism `json:"mechanism"`
// TLSCAFile is the path to either a single or a bundle of
// certificate authorities to trust when making a TLS connection.
TLSCAFile string `key:"auth.tls.caFile" validate:"omitempty,file"`
//nolint:tagliatelle // we prefer our convention with dots as separators
TLSCAFile string `json:"tls.caFile"`
// TLSCertificateKeyFile is the path to the client certificate
// file or the client private key file.
TLSCertificateKeyFile string `key:"auth.tls.certificateKeyFile" validate:"omitempty,file"`
//nolint:tagliatelle // we prefer our convention with dots as separators
TLSCertificateKeyFile string `json:"tls.certificateKeyFile"`
// AWSSessionToken is an AWS session token.
AWSSessionToken string `key:"auth.awsSessionToken"`
AWSSessionToken string `json:"awsSessionToken"`
}

// Parse maps the incoming map to the [Config] and validates it.
func Parse(raw map[string]string) (Config, error) {
config := Config{
URI: defaultConnectionURI,
DB: raw[KeyDB],
Collection: raw[KeyCollection],
Auth: AuthConfig{
Username: raw[KeyAuthUsername],
Password: raw[KeyAuthPassword],
DB: raw[KeyAuthDB],
Mechanism: AuthMechanism(strings.ToUpper(raw[KeyAuthMechanism])),
TLSCAFile: raw[KeyAuthTLSCAFile],
TLSCertificateKeyFile: raw[KeyAuthTLSCertificateKeyFile],
AWSSessionToken: raw[KeyAuthAWSSessionToken],
},
func (c *Config) Validate(context.Context) error {
var errs []error
uri, err := url.Parse(c.URIStr)
if err != nil {
errs = append(errs, err)
} else {
c.uri = uri
}

// parse URI if it's not empty
if uriStr := raw[KeyURI]; uriStr != "" {
uri, err := url.Parse(uriStr)
if err != nil {
return Config{}, fmt.Errorf("parse %q: %w", KeyURI, err)
}

config.URI = uri
err = c.validatePath("auth.tls.caFile", c.Auth.TLSCAFile)
if err != nil {
errs = append(errs, err)
}

// validate auth mechanism if it's not empty
if config.Auth.Mechanism != "" && !config.Auth.Mechanism.IsValid() {
return Config{}, &InvalidAuthMechanismError{
AuthMechanism: config.Auth.Mechanism,
}
err = c.validatePath("auth.tls.certificateKeyFile", c.Auth.TLSCertificateKeyFile)
if err != nil {
errs = append(errs, err)
}

if err := validator.ValidateStruct(&config); err != nil {
return Config{}, fmt.Errorf("validate struct: %w", err)
// validate auth mechanism if it's not empty
c.Auth.Mechanism = AuthMechanism(strings.ToUpper(string(c.Auth.Mechanism)))
if c.Auth.Mechanism != "" && !c.Auth.Mechanism.IsValid() {
errs = append(errs, fmt.Errorf("invalid auth mechanism %q", c.Auth.Mechanism))
}

return config, nil
return errors.Join(errs...)
}

// GetClientOptions returns generated options for mongo connection depending on mechanism.
func (d *Config) GetClientOptions() *options.ClientOptions {
uri, properties := d.getURIAndPropertiesByMechanism()
func (c *Config) GetClientOptions() *options.ClientOptions {
uri, properties := c.getURIAndPropertiesByMechanism()
opts := options.Client().ApplyURI(uri).SetServerSelectionTimeout(defaultServerSelectionTimeout)

// If we don't have any custom auth options, we should skip adding credential options
if d.Auth == (AuthConfig{}) {
if c.Auth == (AuthConfig{}) {
return opts
}

cred := options.Credential{
AuthMechanism: string(d.Auth.Mechanism),
AuthMechanism: string(c.Auth.Mechanism),
AuthMechanismProperties: properties,
AuthSource: d.Auth.DB,
Username: d.Auth.Username,
Password: d.Auth.Password,
AuthSource: c.Auth.DB,
Username: c.Auth.Username,
Password: c.Auth.Password,
}

return opts.SetAuth(cred)
}

// getURIAndPropertiesByMechanism generates uri and options depending on auth mechanism.
func (d *Config) getURIAndPropertiesByMechanism() (string, map[string]string) {
func (c *Config) getURIAndPropertiesByMechanism() (string, map[string]string) {
//nolint:exhaustive // because most of the mechanisms using same options
switch d.Auth.Mechanism {
switch c.Auth.Mechanism {
case MongoDBX509:
uri := *d.URI
uri := *c.uri

values := uri.Query()

if d.Auth.TLSCAFile != "" {
values.Add(tlsCAFileQueryName, d.Auth.TLSCAFile)
if c.Auth.TLSCAFile != "" {
values.Add(tlsCAFileQueryName, c.Auth.TLSCAFile)
}

if d.Auth.TLSCertificateKeyFile != "" {
values.Add(tlsCertificateKeyFileQueryName, d.Auth.TLSCertificateKeyFile)
if c.Auth.TLSCertificateKeyFile != "" {
values.Add(tlsCertificateKeyFileQueryName, c.Auth.TLSCertificateKeyFile)
}

uri.RawQuery = values.Encode()
Expand All @@ -206,15 +175,28 @@ func (d *Config) getURIAndPropertiesByMechanism() (string, map[string]string) {

case MongoDBAWS:
var properties map[string]string
if d.Auth.AWSSessionToken != "" {
if c.Auth.AWSSessionToken != "" {
properties = map[string]string{
awsSessionTokenPropertyName: d.Auth.AWSSessionToken,
awsSessionTokenPropertyName: c.Auth.AWSSessionToken,
}
}

return d.URI.String(), properties
return c.uri.String(), properties

default:
return d.URI.String(), nil
return c.uri.String(), nil
}
}

func (c *Config) validatePath(paramName, path string) error {
if path == "" {
return nil
}

_, err := os.Stat(c.Auth.TLSCAFile)
if err != nil {
return fmt.Errorf("path for %s %q not valid: %w", paramName, path, err)
}

return nil
}
Loading
Loading