diff --git a/README.md b/README.md index 6d89fbf..fec725f 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,24 @@ The connector supports both password and private key authentication methods. ## Destination +Destination connects to a remote server. It takes an `opencdc.Record`, extracts filename from the metadata and upload the file to the remote server. The connector supports both password and private key authentication methods. The connector will sync only those files that are in the source directory itself. +Destination also supports large file uploads. Source can provide large file content chunk by chunk (one record per chunk). Each record should have following in metadata: + +* `filename`: Filename of the file with extension. +* `file_size`: Integer size of the file. +* `chunk_index`: Index of the chunk (starting from 1). +* `total_chunks`: Total number of chunks. +* `hash`: Unique hash, which is used to create temporary file till the last chunk is uploaded. + ### Configuration Options -![scarf pixel](https://static.scarf.sh/a.png?x-pxid=64b333ae-77ad-4895-a5cd-a73bb14362d9) \ No newline at end of file +| name | description | required | +| -------------- | ----------------------------------------------------------------------------------------------------- | -------- | +| `address` | Address is the address of the sftp server to connect.| **true** | +| `hostKey` | HostKey is the key used for host key callback validation.| **true** | +| `username`| User is the username of the SFTP user. | **true** | +| `password`| Password is the SFTP password (can be used as passphrase for private key). | false | +| `privateKeyPath`| PrivateKeyPath is the private key for ssh login.| false | +| `directoryPath` | DirectoryPath is the path to the directory to read/write data. | true | + +![scarf pixel](https://static.scarf.sh/a.png?x-pxid=64b333ae-77ad-4895-a5cd-a73bb14362d9) diff --git a/acceptance_test.go b/acceptance_test.go new file mode 100644 index 0000000..2b51a04 --- /dev/null +++ b/acceptance_test.go @@ -0,0 +1,95 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sftp + +import ( + "fmt" + "os/exec" + "sync/atomic" + "testing" + "time" + + "github.com/conduitio-labs/conduit-connector-sftp/common" + "github.com/conduitio-labs/conduit-connector-sftp/config" + "github.com/conduitio-labs/conduit-connector-sftp/destination" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" +) + +type driver struct { + sdk.ConfigurableAcceptanceTestDriver + id int64 +} + +func (d *driver) GenerateRecord(_ *testing.T, _ opencdc.Operation) opencdc.Record { + atomic.AddInt64(&d.id, 1) + + content := []byte("hello world") + filename := fmt.Sprintf("%d.txt", d.id) + + return sdk.Util.Source.NewRecordCreate( + nil, + map[string]string{ + opencdc.MetadataCollection: "upload", + opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339), + "filename": filename, + "hash": common.GenerateFileHash(filename, time.Now(), 11), + "file_size": fmt.Sprintf("%d", len(content)), + "mod_time": time.Now().UTC().Format(time.RFC3339), + }, + opencdc.StructuredData{"filename": filename}, + opencdc.RawData(content), + ) +} + +func (d *driver) ReadFromDestination(_ *testing.T, records []opencdc.Record) []opencdc.Record { + return records +} + +func TestAcceptance(t *testing.T) { + hostKey, err := setupHostKey() + if err != nil { + fmt.Println(err) + return + } + + sdk.AcceptanceTest(t, &driver{ + ConfigurableAcceptanceTestDriver: sdk.ConfigurableAcceptanceTestDriver{ + Config: sdk.ConfigurableAcceptanceTestDriverConfig{ + Connector: sdk.Connector{ + NewSpecification: Specification, + NewDestination: destination.NewDestination, + NewSource: nil, + }, + DestinationConfig: map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/destination", + }, + }, + }, + }) +} + +func setupHostKey() (string, error) { + cmd := exec.Command("ssh-keyscan", "-t", "rsa", "-p", "2222", "localhost") + output, err := cmd.Output() + if err != nil { + return "", fmt.Errorf("error setupHostKey: %w", err) + } + return string(output), nil +} diff --git a/common/hash.go b/common/hash.go new file mode 100644 index 0000000..813cdc2 --- /dev/null +++ b/common/hash.go @@ -0,0 +1,29 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "crypto/md5" //nolint: gosec // MD5 used for non-cryptographic unique identifier + "encoding/hex" + "fmt" + "time" +) + +// GenerateFileHash creates a unique hash based on file name, mod time, and size. +func GenerateFileHash(fileName string, modTime time.Time, fileSize int64) string { + data := fmt.Sprintf("%s|%s|%d", fileName, modTime.Format(time.RFC3339), fileSize) + hash := md5.Sum([]byte(data)) //nolint: gosec // MD5 used for non-cryptographic unique identifier + return hex.EncodeToString(hash[:]) +} diff --git a/common/sshauth.go b/common/sshauth.go new file mode 100644 index 0000000..e39cb6b --- /dev/null +++ b/common/sshauth.go @@ -0,0 +1,98 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "bytes" + "fmt" + "net" + "os" + + "golang.org/x/crypto/ssh" +) + +var ErrUntrustedKey = fmt.Errorf("host key does not match the trusted key") + +type MismatchKeyTypeError struct { + key1, key2 string +} + +func (e MismatchKeyTypeError) Error() string { + return fmt.Sprintf("host key type mismatch: got %s, want %s", e.key1, e.key2) +} + +func NewMismatchKeyTypeError(key1, key2 string) MismatchKeyTypeError { + return MismatchKeyTypeError{key1, key2} +} + +func SSHConfigAuth(remoteHostKey, username, password, privateKeyPath string) (*ssh.ClientConfig, error) { + //nolint:dogsled // not required here. + hostKey, _, _, _, err := ssh.ParseAuthorizedKey([]byte(remoteHostKey)) + if err != nil { + return nil, fmt.Errorf("failed to parse host key: %w", err) + } + + hostKeyCallback := func(_ string, _ net.Addr, key ssh.PublicKey) error { + if key.Type() != hostKey.Type() { + return NewMismatchKeyTypeError(key.Type(), hostKey.Type()) + } + + if !bytes.Equal(key.Marshal(), hostKey.Marshal()) { + return ErrUntrustedKey + } + + return nil + } + + sshConfig := &ssh.ClientConfig{ + User: username, + HostKeyCallback: hostKeyCallback, + } + + if privateKeyPath != "" { + auth, err := authWithPrivateKey(privateKeyPath, password) + if err != nil { + return nil, err + } + + sshConfig.Auth = []ssh.AuthMethod{auth} + return sshConfig, nil + } + + sshConfig.Auth = []ssh.AuthMethod{ssh.Password(password)} + return sshConfig, nil +} + +func authWithPrivateKey(privateKeyPath, password string) (ssh.AuthMethod, error) { + key, err := os.ReadFile(privateKeyPath) + if err != nil { + return nil, fmt.Errorf("failed to read private key file: %w", err) + } + + if password != "" { + signer, err := ssh.ParsePrivateKeyWithPassphrase(key, []byte(password)) + if err != nil { + return nil, fmt.Errorf("failed to parse private key: %w", err) + } + return ssh.PublicKeys(signer), nil + } + + signer, err := ssh.ParsePrivateKey(key) + if err != nil { + return nil, fmt.Errorf("failed to parse private key: %w", err) + } + + return ssh.PublicKeys(signer), nil +} diff --git a/config/config.go b/config/config.go index cb2b5ce..28b3998 100644 --- a/config/config.go +++ b/config/config.go @@ -16,7 +16,9 @@ package config -import "fmt" +import ( + "fmt" +) var ErrEmptyAuthFields = fmt.Errorf("both %q and %q can not be empty", ConfigPassword, ConfigPrivateKeyPath) @@ -26,7 +28,7 @@ type Config struct { Address string `json:"address" validate:"required"` // HostKey is the key used for host key callback validation. HostKey string `json:"hostKey" validate:"required"` - // User is the SFTP user. + // User is the username of the SFTP user. Username string `json:"username" validate:"required"` // Password is the SFTP password (can be used as passphrase for private key). Password string `json:"password"` diff --git a/config/paramgen.go b/config/paramgen.go index 93426de..e1a321c 100644 --- a/config/paramgen.go +++ b/config/paramgen.go @@ -56,7 +56,7 @@ func (Config) Parameters() map[string]config.Parameter { }, ConfigUsername: { Default: "", - Description: "User is the SFTP user.", + Description: "User is the username of the SFTP user.", Type: config.ParameterTypeString, Validations: []config.Validation{ config.ValidationRequired{}, diff --git a/connector.go b/connector.go index c944c50..0923019 100644 --- a/connector.go +++ b/connector.go @@ -15,6 +15,7 @@ package sftp import ( + "github.com/conduitio-labs/conduit-connector-sftp/destination" source "github.com/conduitio-labs/conduit-connector-sftp/source" sdk "github.com/conduitio/conduit-connector-sdk" ) @@ -22,6 +23,6 @@ import ( // Connector combines all constructors for each plugin in one struct. var Connector = sdk.Connector{ NewSpecification: Specification, + NewDestination: destination.NewDestination, NewSource: source.NewSource, - NewDestination: nil, } diff --git a/destination/destination.go b/destination/destination.go new file mode 100644 index 0000000..0f2345e --- /dev/null +++ b/destination/destination.go @@ -0,0 +1,325 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package destination + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "os" + "strconv" + + "github.com/conduitio-labs/conduit-connector-sftp/common" + "github.com/conduitio-labs/conduit-connector-sftp/config" + commonsConfig "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/lang" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/pkg/sftp" + "golang.org/x/crypto/ssh" +) + +type Destination struct { + sdk.UnimplementedDestination + + config config.Config + sshClient *ssh.Client + sftpClient *sftp.Client +} + +func NewDestination() sdk.Destination { + return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware( + sdk.DestinationWithSchemaExtractionConfig{ + PayloadEnabled: lang.Ptr(false), + KeyEnabled: lang.Ptr(false), + }, + )...) +} + +func (d *Destination) Parameters() commonsConfig.Parameters { + return d.config.Parameters() +} + +func (d *Destination) Configure(ctx context.Context, cfg commonsConfig.Config) error { + sdk.Logger(ctx).Info().Msg("Configuring Destination...") + err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters()) + if err != nil { + return fmt.Errorf("invalid config: %w", err) + } + + err = d.config.Validate() + if err != nil { + return fmt.Errorf("error validating configuration: %w", err) + } + + return nil +} + +func (d *Destination) Open(ctx context.Context) error { + sdk.Logger(ctx).Info().Msg("Opening a SFTP Destination...") + sshConfig, err := common.SSHConfigAuth(d.config.HostKey, d.config.Username, d.config.Password, d.config.PrivateKeyPath) + if err != nil { + return fmt.Errorf("failed to create SSH config: %w", err) + } + + d.sshClient, err = ssh.Dial("tcp", d.config.Address, sshConfig) + if err != nil { + return fmt.Errorf("failed to dial SSH: %w", err) + } + + d.sftpClient, err = sftp.NewClient(d.sshClient) + if err != nil { + d.sshClient.Close() + return fmt.Errorf("failed to create SFTP client: %w", err) + } + + _, err = d.sftpClient.Stat(d.config.DirectoryPath) + if err != nil { + return fmt.Errorf("remote path does not exist: %w", err) + } + + return nil +} + +func (d *Destination) Write(_ context.Context, records []opencdc.Record) (int, error) { + for i, record := range records { + chunked, ok := record.Metadata["is_chunked"] + if ok && chunked == "true" { + err := d.handleChunkedRecord(record) + if err != nil { + return i, err + } + continue + } + + err := d.uploadFile(record) + if err != nil { + return i, err + } + } + + return len(records), nil +} + +func (d *Destination) Teardown(ctx context.Context) error { + sdk.Logger(ctx).Info().Msg("Tearing down the SFTP Destination") + + var errs []error + if d.sftpClient != nil { + if err := d.sftpClient.Close(); err != nil { + errs = append(errs, fmt.Errorf("close SFTP client: %w", err)) + } + } + + if d.sshClient != nil { + if err := d.sshClient.Close(); err != nil { + errs = append(errs, fmt.Errorf("close SSH client: %w", err)) + } + } + + if len(errs) > 0 { + return fmt.Errorf("error teardown: %w", errors.Join(errs...)) + } + + return nil +} + +func (d *Destination) handleChunkedRecord(record opencdc.Record) error { + metaData, err := d.extractMetadata(record) + if err != nil { + return err + } + + var remoteFile *sftp.File + path := fmt.Sprintf("%s/%s.tmp", d.config.DirectoryPath, metaData.hash) + if metaData.index == 1 { + remoteFile, err = d.sftpClient.Create(path) + if err != nil { + return fmt.Errorf("failed to create remote file: %w", err) + } + } else { + remoteFile, err = d.sftpClient.OpenFile(path, os.O_WRONLY|os.O_APPEND) + if err != nil { + return fmt.Errorf("failed to open remote file: %w", err) + } + } + + reader := bytes.NewReader(record.Payload.After.Bytes()) + _, err = reader.WriteTo(remoteFile) + if err != nil { + return fmt.Errorf("failed to write content to remote file: %w", err) + } + remoteFile.Close() + + if metaData.index == metaData.totalChunks { + // compare the uploaded filesize with the source filesize to confirm successful upload + err = d.compareFileSize(path, metaData.filesize) + if err != nil { + return err + } + + err = d.renameFile(path, fmt.Sprintf("%s/%s", d.config.DirectoryPath, metaData.filename)) + if err != nil { + return err + } + } + + return nil +} + +func (d *Destination) uploadFile(record opencdc.Record) error { + metaData, err := d.extractMetadata(record) + if err != nil { + return err + } + + path := fmt.Sprintf("%s/%s", d.config.DirectoryPath, metaData.filename) + remoteFile, err := d.sftpClient.Create(path) + if err != nil { + return fmt.Errorf("failed to create remote file: %w", err) + } + defer remoteFile.Close() + + reader := bytes.NewReader(record.Payload.After.Bytes()) + _, err = reader.WriteTo(remoteFile) + if err != nil { + return fmt.Errorf("failed to write content to remote file: %w", err) + } + + // compare the uploaded filesize with the source filesize to confirm successful upload + err = d.compareFileSize(path, metaData.filesize) + if err != nil { + return err + } + + return nil +} + +func (d *Destination) structurizeData(data opencdc.Data) (opencdc.StructuredData, error) { + if data == nil || len(data.Bytes()) == 0 { + return opencdc.StructuredData{}, nil + } + + structuredData := make(opencdc.StructuredData) + if err := json.Unmarshal(data.Bytes(), &structuredData); err != nil { + return nil, fmt.Errorf("unmarshal data into structured data: %w", err) + } + + return structuredData, nil +} + +func (d *Destination) compareFileSize(path string, size int64) error { + stat, err := d.sftpClient.Stat(path) + if err != nil { + return fmt.Errorf("failed to stat remote file: %w", err) + } + + if stat.Size() != size { + return NewInvalidFileError(fmt.Sprintf("uploaded filesize(%v) is different than source filesize(%v)", stat.Size(), size)) + } + + return nil +} + +type metadata struct { + index int64 + totalChunks int64 + hash string + filename string + filesize int64 +} + +func (d *Destination) extractMetadata(record opencdc.Record) (metadata, error) { + var index, total int64 + chunked, ok := record.Metadata["is_chunked"] + if ok && chunked == "true" { + chunkIndex, ok := record.Metadata["chunk_index"] + if !ok { + return metadata{}, NewInvalidChunkError("chunk_index not found") + } + + var err error + index, err = strconv.ParseInt(chunkIndex, 10, 64) + if err != nil { + return metadata{}, fmt.Errorf("failed to parse chunk_index: %w", err) + } + + totalChunks, ok := record.Metadata["total_chunks"] + if !ok { + return metadata{}, NewInvalidChunkError("total_chunk not found") + } + + total, err = strconv.ParseInt(totalChunks, 10, 64) + if err != nil { + return metadata{}, fmt.Errorf("failed to parse total_chunks: %w", err) + } + } + + hash, ok := record.Metadata["hash"] + if !ok { + return metadata{}, NewInvalidChunkError("hash not found") + } + + filename, ok := record.Metadata["filename"] + if !ok { + structuredKey, err := d.structurizeData(record.Key) + if err != nil { + filename = string(record.Key.Bytes()) + } else { + name, ok := structuredKey["filename"].(string) + if !ok { + return metadata{}, NewInvalidChunkError("invalid filename") + } + filename = name + } + } + + fileSize, ok := record.Metadata["file_size"] + if !ok { + return metadata{}, NewInvalidChunkError("file_size not found") + } + size, err := strconv.ParseInt(fileSize, 10, 64) + if err != nil { + return metadata{}, fmt.Errorf("failed to parse file_size: %w", err) + } + + return metadata{ + index: index, + totalChunks: total, + hash: hash, + filename: filename, + filesize: size, + }, nil +} + +func (d *Destination) renameFile(path, newPath string) error { + // check if file already exists then remove it before renaming + _, err := d.sftpClient.Stat(newPath) + if err == nil { + err = d.sftpClient.Remove(newPath) + if err != nil { + return fmt.Errorf("failed to remove remote file: %w", err) + } + } + + err = d.sftpClient.Rename(path, newPath) + if err != nil { + return fmt.Errorf("failed to rename remote file: %w", err) + } + + return nil +} diff --git a/destination/destination_integration_test.go b/destination/destination_integration_test.go new file mode 100644 index 0000000..52c002e --- /dev/null +++ b/destination/destination_integration_test.go @@ -0,0 +1,455 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package destination + +import ( + "context" + "fmt" + "os/exec" + "testing" + "time" + + "github.com/conduitio-labs/conduit-connector-sftp/config" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/matryer/is" +) + +func TestDestination_Configure(t *testing.T) { + t.Parallel() + + t.Run("destination configure success", func(t *testing.T) { + is := is.New(t) + + d := NewDestination() + + err := d.Configure(context.Background(), map[string]string{ + config.ConfigAddress: "locahost:22", + config.ConfigHostKey: "host-key", + config.ConfigUsername: "root", + config.ConfigPassword: "root", + config.ConfigDirectoryPath: "/home/root", + }) + is.NoErr(err) + }) + + t.Run("destination configure failure", func(t *testing.T) { + is := is.New(t) + + d := NewDestination() + + err := d.Configure(context.Background(), map[string]string{ + config.ConfigHostKey: "host-key", + config.ConfigUsername: "root", + config.ConfigPassword: "root", + config.ConfigDirectoryPath: "/home/root", + }) + is.True(err != nil) + is.Equal(err.Error(), + `invalid config: config invalid: error validating "address": required parameter is not provided`) + }) + + t.Run("destination configure fail config validate", func(t *testing.T) { + is := is.New(t) + + d := NewDestination() + + err := d.Configure(context.Background(), map[string]string{ + config.ConfigAddress: "locahost:22", + config.ConfigHostKey: "host-key", + config.ConfigUsername: "root", + config.ConfigPassword: "", + config.ConfigDirectoryPath: "/home/root", + }) + is.True(err != nil) + is.Equal(err.Error(), + `error validating configuration: both "password" and "privateKeyPath" can not be empty`) + }) +} + +func TestDestination_Open(t *testing.T) { + t.Parallel() + + hostKey, err := setupHostKey() + if err != nil { + fmt.Println(err) + return + } + + t.Run("destination open success", func(t *testing.T) { + is := is.New(t) + + d := NewDestination() + + ctx := context.Background() + + err := d.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/destination", + }) + is.NoErr(err) + + err = d.Open(ctx) + is.NoErr(err) + + d.Teardown(ctx) + }) + + t.Run("destination open error sshConfig", func(t *testing.T) { + is := is.New(t) + + d := NewDestination() + + ctx := context.Background() + + err := d.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: "hostKey", + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/destination", + }) + is.NoErr(err) + + err = d.Open(ctx) + is.True(err != nil) + is.Equal(err.Error(), "failed to create SSH config: failed to parse host key: ssh: no key found") + + d.Teardown(ctx) + }) + + t.Run("destination open error read private key", func(t *testing.T) { + is := is.New(t) + + d := NewDestination() + + ctx := context.Background() + + err := d.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPrivateKeyPath: "privatekey", + config.ConfigDirectoryPath: "/upload", + }) + is.NoErr(err) + + err = d.Open(ctx) + is.True(err != nil) + is.Equal(err.Error(), "failed to create SSH config: failed to read private key file: open privatekey: no such file or directory") + + d.Teardown(ctx) + }) + + t.Run("destination open error ssh dial", func(t *testing.T) { + is := is.New(t) + + d := NewDestination() + + ctx := context.Background() + + err := d.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:22", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "root", + config.ConfigPassword: "root", + config.ConfigDirectoryPath: "/home/root", + }) + is.NoErr(err) + + err = d.Open(ctx) + is.True(err != nil) + is.Equal(err.Error(), "failed to dial SSH: ssh: handshake failed: host key type mismatch: got ecdsa-sha2-nistp256, want ssh-rsa") + }) + + t.Run("destination open error remote path not exist", func(t *testing.T) { + is := is.New(t) + + d := NewDestination() + + ctx := context.Background() + + err := d.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/home/root", + }) + is.NoErr(err) + + err = d.Open(ctx) + is.True(err != nil) + is.Equal(err.Error(), "remote path does not exist: file does not exist") + }) +} + +func TestDestination_Write(t *testing.T) { + t.Parallel() + + hostKey, err := setupHostKey() + if err != nil { + fmt.Println(err) + return + } + + t.Run("destination write success", func(t *testing.T) { + is := is.New(t) + d := NewDestination() + ctx := context.Background() + + err := d.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/destination", + }) + is.NoErr(err) + + err = d.Open(ctx) + is.NoErr(err) + + content := []byte(`Hello World!`) + + records := []opencdc.Record{ + sdk.Util.Source.NewRecordCreate( + nil, + map[string]string{ + opencdc.MetadataCollection: "upload", + opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339), + "filename": "example.txt", + "source_path": "/upload", + "hash": "55fa9e9cb76faa2e544668384538b19a", + "file_size": fmt.Sprintf("%d", len(content)), + "mod_time": time.Now().UTC().Format(time.RFC3339), + }, + opencdc.StructuredData{"filename": "example.txt"}, + opencdc.RawData(content), + ), + } + + n, err := d.Write(ctx, records) + is.NoErr(err) + is.Equal(n, len(records)) + + d.Teardown(ctx) + }) + + t.Run("destination write success filename from key", func(t *testing.T) { + is := is.New(t) + d := NewDestination() + ctx := context.Background() + + err := d.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/destination", + }) + is.NoErr(err) + + err = d.Open(ctx) + is.NoErr(err) + + content := []byte(`Hello World!`) + + records := []opencdc.Record{ + sdk.Util.Source.NewRecordCreate( + nil, + map[string]string{ + opencdc.MetadataCollection: "upload", + opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339), + "source_path": "/upload", + "hash": "55fa9e9cb76faa2e544668384538b19a", + "file_size": fmt.Sprintf("%d", len(content)), + "mod_time": time.Now().UTC().Format(time.RFC3339), + }, + opencdc.StructuredData{"filename": "example.txt"}, + opencdc.RawData(content), + ), + } + + n, err := d.Write(ctx, records) + is.NoErr(err) + is.Equal(n, len(records)) + + d.Teardown(ctx) + }) + + t.Run("destination write success filename from rawdata key", func(t *testing.T) { + is := is.New(t) + d := NewDestination() + ctx := context.Background() + + err := d.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/destination", + }) + is.NoErr(err) + + err = d.Open(ctx) + is.NoErr(err) + + content := []byte(`Hello World!`) + + records := []opencdc.Record{ + sdk.Util.Source.NewRecordCreate( + nil, + map[string]string{ + opencdc.MetadataCollection: "upload", + opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339), + "source_path": "/upload", + "hash": "55fa9e9cb76faa2e544668384538b19a", + "file_size": fmt.Sprintf("%d", len(content)), + "mod_time": time.Now().UTC().Format(time.RFC3339), + }, + opencdc.RawData([]byte("example.txt")), + opencdc.RawData(content), + ), + } + + n, err := d.Write(ctx, records) + is.NoErr(err) + is.Equal(n, len(records)) + + d.Teardown(ctx) + }) + + t.Run("destination write failure", func(t *testing.T) { + is := is.New(t) + d := NewDestination() + ctx := context.Background() + + err := d.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/destination", + }) + is.NoErr(err) + + err = d.Open(ctx) + is.NoErr(err) + + content := []byte(`Hello World!`) + + records := []opencdc.Record{ + sdk.Util.Source.NewRecordCreate( + nil, + map[string]string{ + opencdc.MetadataCollection: "upload", + opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339), + "filename": "", + "source_path": "/upload", + "hash": "55fa9e9cb76faa2e544668384538b19a", + "file_size": fmt.Sprintf("%d", len(content)), + "mod_time": time.Now().UTC().Format(time.RFC3339), + }, + opencdc.StructuredData{"filename": ""}, + opencdc.RawData(content), + ), + } + + _, err = d.Write(ctx, records) + is.True(err != nil) + is.Equal(err.Error(), `failed to create remote file: sftp: "Failure" (SSH_FX_FAILURE)`) + + d.Teardown(ctx) + }) + + t.Run("destination large file upload", func(t *testing.T) { + is := is.New(t) + d := NewDestination() + ctx := context.Background() + + err := d.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/destination", + }) + is.NoErr(err) + + err = d.Open(ctx) + is.NoErr(err) + + records := []opencdc.Record{ + sdk.Util.Source.NewRecordCreate( + nil, + map[string]string{ + opencdc.MetadataCollection: "upload", + opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339), + "filename": "largefile.txt", + "chunk_index": "1", + "total_chunks": "2", + "is_chunked": "true", + "hash": "55fa9e9cb76faa2e544668384538b19a", + "file_size": "26", + "mod_time": time.Now().UTC().Format(time.RFC3339), + }, + opencdc.StructuredData{"filename": "largefile.txt"}, + opencdc.RawData([]byte(`Hello World!1`)), + ), + sdk.Util.Source.NewRecordCreate( + nil, + map[string]string{ + opencdc.MetadataCollection: "upload", + opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339), + "filename": "largefile.txt", + "chunk_index": "2", + "total_chunks": "2", + "is_chunked": "true", + "hash": "55fa9e9cb76faa2e544668384538b19a", + "file_size": "26", + "mod_time": time.Now().UTC().Format(time.RFC3339), + }, + opencdc.StructuredData{"filename": "largefile.txt"}, + opencdc.RawData([]byte(`Hello World!2`)), + ), + } + + n, err := d.Write(ctx, records) + is.NoErr(err) + is.Equal(n, len(records)) + + d.Teardown(ctx) + }) +} + +func TestTeardown_NoOpen(t *testing.T) { + is := is.New(t) + con := NewDestination() + err := con.Teardown(context.Background()) + is.NoErr(err) +} + +func setupHostKey() (string, error) { + cmd := exec.Command("ssh-keyscan", "-t", "rsa", "-p", "2222", "localhost") + output, err := cmd.Output() + if err != nil { + return "", fmt.Errorf("error setupHostKey: %w", err) + } + return string(output), nil +} diff --git a/destination/error.go b/destination/error.go new file mode 100644 index 0000000..a599a70 --- /dev/null +++ b/destination/error.go @@ -0,0 +1,43 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package destination + +import ( + "fmt" +) + +type InvalidChunkError struct { + message string +} + +func (e InvalidChunkError) Error() string { + return fmt.Sprintf("invalid chunk: %s", e.message) +} + +func NewInvalidChunkError(msg string) InvalidChunkError { + return InvalidChunkError{msg} +} + +type InvalidFileError struct { + message string +} + +func (e InvalidFileError) Error() string { + return fmt.Sprintf("invalid file: %s", e.message) +} + +func NewInvalidFileError(msg string) InvalidFileError { + return InvalidFileError{msg} +} diff --git a/source/config/paramgen.go b/source/config/paramgen.go index 2424853..29b4600 100644 --- a/source/config/paramgen.go +++ b/source/config/paramgen.go @@ -70,7 +70,7 @@ func (Config) Parameters() map[string]config.Parameter { }, ConfigUsername: { Default: "", - Description: "User is the SFTP user.", + Description: "User is the username of the SFTP user.", Type: config.ParameterTypeString, Validations: []config.Validation{ config.ValidationRequired{}, diff --git a/source/iterator.go b/source/iterator.go index a4aa4b4..aba08c6 100644 --- a/source/iterator.go +++ b/source/iterator.go @@ -16,8 +16,6 @@ package source import ( "context" - "crypto/md5" //nolint: gosec // MD5 used for non-cryptographic unique identifier - "encoding/hex" "encoding/json" "errors" "fmt" @@ -29,6 +27,7 @@ import ( "sort" "time" + "github.com/conduitio-labs/conduit-connector-sftp/common" "github.com/conduitio-labs/conduit-connector-sftp/source/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-connector-sdk" @@ -138,7 +137,7 @@ func (iter *Iterator) processFile(ctx context.Context, fileInfo fileInfo) (openc return sdk.Util.Source.NewRecordCreate( positionBytes, result.metadata, - opencdc.StructuredData{"filename": fileInfo.name}, + opencdc.RawData([]byte(fileInfo.name)), opencdc.RawData(result.content), ), nil } @@ -182,7 +181,7 @@ func (iter *Iterator) readFileContent(ctx context.Context, file *sftp.File, stat } result.content = content - result.metadata = iter.createMetadata(stat, filePath, len(content)) + result.metadata = iter.createMetadata(stat, filePath, stat.Size()) return result, nil } @@ -224,7 +223,7 @@ func (iter *Iterator) readLargeFileChunk(ctx context.Context, file *sftp.File, s result.content = chunk - result.metadata = iter.createMetadata(stat, filePath, len(chunk)) + result.metadata = iter.createMetadata(stat, filePath, stat.Size()) result.metadata["chunk_index"] = fmt.Sprintf("%d", result.chunkIndex) result.metadata["total_chunks"] = fmt.Sprintf("%d", result.totalChunks) result.metadata["is_chunked"] = "true" @@ -317,21 +316,14 @@ func (iter *Iterator) validateFile(ctx context.Context, fileInfo os.FileInfo, fi return nil } -func (iter *Iterator) createMetadata(fileInfo os.FileInfo, filePath string, contentLength int) opencdc.Metadata { +func (iter *Iterator) createMetadata(fileInfo os.FileInfo, filePath string, filesize int64) opencdc.Metadata { return opencdc.Metadata{ opencdc.MetadataCollection: iter.config.DirectoryPath, opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339), "filename": fileInfo.Name(), "source_path": filePath, - "file_size": fmt.Sprintf("%d", contentLength), + "file_size": fmt.Sprintf("%d", filesize), "mod_time": fileInfo.ModTime().UTC().Format(time.RFC3339), - "hash": generateFileHash(fileInfo.Name(), fileInfo.ModTime().UTC(), fileInfo.Size()), + "hash": common.GenerateFileHash(fileInfo.Name(), fileInfo.ModTime().UTC(), fileInfo.Size()), } } - -// generateFileHash creates a unique hash based on file name, mod time, and size. -func generateFileHash(fileName string, modTime time.Time, fileSize int64) string { - data := fmt.Sprintf("%s|%s|%d", fileName, modTime.Format(time.RFC3339), fileSize) - hash := md5.Sum([]byte(data)) //nolint: gosec // MD5 used for non-cryptographic unique identifier - return hex.EncodeToString(hash[:]) -} diff --git a/source/source.go b/source/source.go index 3e391a7..c02e299 100644 --- a/source/source.go +++ b/source/source.go @@ -18,8 +18,8 @@ import ( "context" "errors" "fmt" - "os" + "github.com/conduitio-labs/conduit-connector-sftp/common" "github.com/conduitio-labs/conduit-connector-sftp/source/config" commonsConfig "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/lang" @@ -44,9 +44,10 @@ type Source struct { // NewSource initialises a new source. func NewSource() sdk.Source { return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware( - // disable schema extraction by default, because the source produces raw payload data + // disable schema extraction by default, because the source produces raw key and payload data sdk.SourceWithSchemaExtractionConfig{ PayloadEnabled: lang.Ptr(false), + KeyEnabled: lang.Ptr(false), }, )...) } @@ -73,7 +74,7 @@ func (s *Source) Configure(ctx context.Context, cfgRaw commonsConfig.Config) err func (s *Source) Open(ctx context.Context, position opencdc.Position) error { sdk.Logger(ctx).Info().Msg("Opening a SFTP Source...") - sshConfig, err := s.sshConfigAuth() + sshConfig, err := common.SSHConfigAuth(s.config.HostKey, s.config.Username, s.config.Password, s.config.PrivateKeyPath) if err != nil { return fmt.Errorf("failed to create SSH config: %w", err) } @@ -139,52 +140,3 @@ func (s *Source) Teardown(ctx context.Context) error { return errors.Join(errs...) } - -func (s *Source) sshConfigAuth() (*ssh.ClientConfig, error) { - sshConfig := &ssh.ClientConfig{ - User: s.config.Username, - } - - //nolint:dogsled // not required here. - hostKey, _, _, _, err := ssh.ParseAuthorizedKey([]byte(s.config.HostKey)) - if err != nil { - return nil, fmt.Errorf("failed to parse host key: %w", err) - } - - sshConfig.HostKeyCallback = ssh.FixedHostKey(hostKey) - - if s.config.PrivateKeyPath != "" { - auth, err := s.authWithPrivateKey() - if err != nil { - return nil, err - } - - sshConfig.Auth = []ssh.AuthMethod{auth} - return sshConfig, nil - } - - sshConfig.Auth = []ssh.AuthMethod{ssh.Password(s.config.Password)} - return sshConfig, nil -} - -func (s *Source) authWithPrivateKey() (ssh.AuthMethod, error) { - key, err := os.ReadFile(s.config.PrivateKeyPath) - if err != nil { - return nil, fmt.Errorf("failed to read private key file: %w", err) - } - - if s.config.Password != "" { - signer, err := ssh.ParsePrivateKeyWithPassphrase(key, []byte(s.config.Password)) - if err != nil { - return nil, fmt.Errorf("failed to parse private key: %w", err) - } - return ssh.PublicKeys(signer), nil - } - - signer, err := ssh.ParsePrivateKey(key) - if err != nil { - return nil, fmt.Errorf("failed to parse private key: %w", err) - } - - return ssh.PublicKeys(signer), nil -} diff --git a/source/source_integration_test.go b/source/source_integration_test.go index 10f1bcd..0096a5d 100644 --- a/source/source_integration_test.go +++ b/source/source_integration_test.go @@ -184,7 +184,7 @@ func TestSource_Open(t *testing.T) { err = s.Open(ctx, nil) is.True(err != nil) - is.Equal(err.Error(), "failed to dial SSH: ssh: handshake failed: ssh: host key mismatch") + is.Equal(err.Error(), "failed to dial SSH: ssh: handshake failed: host key type mismatch: got ecdsa-sha2-nistp256, want ssh-rsa") }) t.Run("source open error remote path not exist", func(t *testing.T) { diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 20292d5..1850259 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -9,4 +9,4 @@ services: environment: - USER=user - PASS=pass - command: user:pass:::source,destination \ No newline at end of file + command: user:pass:::source,destination