Skip to content

Commit

Permalink
Destination connector implementation. (#1)
Browse files Browse the repository at this point in the history
* feat: package sftp

* feat: updated initial template and added config

* feat: updated config

* feat: destination configuration

* feat: config test and error handling

* feat: initial source implementation

* feat: sftp client

* feat: upload file

* feat: handled host key callback

* feat: adding test cases

* fix: lint

* seperated source logic into iterator

* feat: acceptance

* fix: small fix

* feat: readme

* added test cases

* added integration test

* remove go generate directive from source

* fix: test cases

* fix: teardown

* added go header

* added file chunk mechanism for files larger than 3 mb

* added configurable chunk

* fix: updated readme

* fix: large file processing

* feat: upload large file

* fix: refactored iterator

* fix: handle file modification while read

* fix: source integration test

* modify README file

* feat: merged source

* fix: source acceptance to be handled separately

* fix: refactored iterator to provide record on demand

* feat: handled missing filename in metadata

* added source and destination directories in docker compose

* fix: test workflow

* fix: refactored

* fix: source and destination tests

* fix: linters

* fix: actual filesize in source metadata and handle filename from rawdata key

* fix: source raw key

* fix: source middleware

---------

Co-authored-by: Gaurav Sahil <[email protected]>
  • Loading branch information
parikshitg and Gaurav Sahil authored Jan 15, 2025
1 parent 1aca814 commit f161082
Show file tree
Hide file tree
Showing 15 changed files with 1,085 additions and 75 deletions.
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ The connector supports both password and private key authentication methods.

## Destination

Destination connects to a remote server. It takes an `opencdc.Record`, extracts filename from the metadata and upload the file to the remote server. The connector supports both password and private key authentication methods. The connector will sync only those files that are in the source directory itself.
Destination also supports large file uploads. Source can provide large file content chunk by chunk (one record per chunk). Each record should have following in metadata:

* `filename`: Filename of the file with extension.
* `file_size`: Integer size of the file.
* `chunk_index`: Index of the chunk (starting from 1).
* `total_chunks`: Total number of chunks.
* `hash`: Unique hash, which is used to create temporary file till the last chunk is uploaded.

### Configuration Options

![scarf pixel](https://static.scarf.sh/a.png?x-pxid=64b333ae-77ad-4895-a5cd-a73bb14362d9)
| name | description | required |
| -------------- | ----------------------------------------------------------------------------------------------------- | -------- |
| `address` | Address is the address of the sftp server to connect.| **true** |
| `hostKey` | HostKey is the key used for host key callback validation.| **true** |
| `username`| User is the username of the SFTP user. | **true** |
| `password`| Password is the SFTP password (can be used as passphrase for private key). | false |
| `privateKeyPath`| PrivateKeyPath is the private key for ssh login.| false |
| `directoryPath` | DirectoryPath is the path to the directory to read/write data. | true |

![scarf pixel](https://static.scarf.sh/a.png?x-pxid=64b333ae-77ad-4895-a5cd-a73bb14362d9)
95 changes: 95 additions & 0 deletions acceptance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sftp

import (
"fmt"
"os/exec"
"sync/atomic"
"testing"
"time"

"github.com/conduitio-labs/conduit-connector-sftp/common"
"github.com/conduitio-labs/conduit-connector-sftp/config"
"github.com/conduitio-labs/conduit-connector-sftp/destination"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
)

type driver struct {
sdk.ConfigurableAcceptanceTestDriver
id int64
}

func (d *driver) GenerateRecord(_ *testing.T, _ opencdc.Operation) opencdc.Record {
atomic.AddInt64(&d.id, 1)

content := []byte("hello world")
filename := fmt.Sprintf("%d.txt", d.id)

return sdk.Util.Source.NewRecordCreate(
nil,
map[string]string{
opencdc.MetadataCollection: "upload",
opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339),
"filename": filename,
"hash": common.GenerateFileHash(filename, time.Now(), 11),
"file_size": fmt.Sprintf("%d", len(content)),
"mod_time": time.Now().UTC().Format(time.RFC3339),
},
opencdc.StructuredData{"filename": filename},
opencdc.RawData(content),
)
}

func (d *driver) ReadFromDestination(_ *testing.T, records []opencdc.Record) []opencdc.Record {
return records
}

func TestAcceptance(t *testing.T) {
hostKey, err := setupHostKey()
if err != nil {
fmt.Println(err)
return
}

sdk.AcceptanceTest(t, &driver{
ConfigurableAcceptanceTestDriver: sdk.ConfigurableAcceptanceTestDriver{
Config: sdk.ConfigurableAcceptanceTestDriverConfig{
Connector: sdk.Connector{
NewSpecification: Specification,
NewDestination: destination.NewDestination,
NewSource: nil,
},
DestinationConfig: map[string]string{
config.ConfigAddress: "localhost:2222",
config.ConfigHostKey: hostKey,
config.ConfigUsername: "user",
config.ConfigPassword: "pass",
config.ConfigDirectoryPath: "/destination",
},
},
},
})
}

func setupHostKey() (string, error) {
cmd := exec.Command("ssh-keyscan", "-t", "rsa", "-p", "2222", "localhost")
output, err := cmd.Output()
if err != nil {
return "", fmt.Errorf("error setupHostKey: %w", err)
}
return string(output), nil
}
29 changes: 29 additions & 0 deletions common/hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"crypto/md5" //nolint: gosec // MD5 used for non-cryptographic unique identifier
"encoding/hex"
"fmt"
"time"
)

// GenerateFileHash creates a unique hash based on file name, mod time, and size.
func GenerateFileHash(fileName string, modTime time.Time, fileSize int64) string {
data := fmt.Sprintf("%s|%s|%d", fileName, modTime.Format(time.RFC3339), fileSize)
hash := md5.Sum([]byte(data)) //nolint: gosec // MD5 used for non-cryptographic unique identifier
return hex.EncodeToString(hash[:])
}
98 changes: 98 additions & 0 deletions common/sshauth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"bytes"
"fmt"
"net"
"os"

"golang.org/x/crypto/ssh"
)

var ErrUntrustedKey = fmt.Errorf("host key does not match the trusted key")

type MismatchKeyTypeError struct {
key1, key2 string
}

func (e MismatchKeyTypeError) Error() string {
return fmt.Sprintf("host key type mismatch: got %s, want %s", e.key1, e.key2)
}

func NewMismatchKeyTypeError(key1, key2 string) MismatchKeyTypeError {
return MismatchKeyTypeError{key1, key2}
}

func SSHConfigAuth(remoteHostKey, username, password, privateKeyPath string) (*ssh.ClientConfig, error) {
//nolint:dogsled // not required here.
hostKey, _, _, _, err := ssh.ParseAuthorizedKey([]byte(remoteHostKey))
if err != nil {
return nil, fmt.Errorf("failed to parse host key: %w", err)
}

hostKeyCallback := func(_ string, _ net.Addr, key ssh.PublicKey) error {
if key.Type() != hostKey.Type() {
return NewMismatchKeyTypeError(key.Type(), hostKey.Type())
}

if !bytes.Equal(key.Marshal(), hostKey.Marshal()) {
return ErrUntrustedKey
}

return nil
}

sshConfig := &ssh.ClientConfig{
User: username,
HostKeyCallback: hostKeyCallback,
}

if privateKeyPath != "" {
auth, err := authWithPrivateKey(privateKeyPath, password)
if err != nil {
return nil, err
}

sshConfig.Auth = []ssh.AuthMethod{auth}
return sshConfig, nil
}

sshConfig.Auth = []ssh.AuthMethod{ssh.Password(password)}
return sshConfig, nil
}

func authWithPrivateKey(privateKeyPath, password string) (ssh.AuthMethod, error) {
key, err := os.ReadFile(privateKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to read private key file: %w", err)
}

if password != "" {
signer, err := ssh.ParsePrivateKeyWithPassphrase(key, []byte(password))
if err != nil {
return nil, fmt.Errorf("failed to parse private key: %w", err)
}
return ssh.PublicKeys(signer), nil
}

signer, err := ssh.ParsePrivateKey(key)
if err != nil {
return nil, fmt.Errorf("failed to parse private key: %w", err)
}

return ssh.PublicKeys(signer), nil
}
6 changes: 4 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package config

import "fmt"
import (
"fmt"
)

var ErrEmptyAuthFields = fmt.Errorf("both %q and %q can not be empty", ConfigPassword, ConfigPrivateKeyPath)

Expand All @@ -26,7 +28,7 @@ type Config struct {
Address string `json:"address" validate:"required"`
// HostKey is the key used for host key callback validation.
HostKey string `json:"hostKey" validate:"required"`
// User is the SFTP user.
// User is the username of the SFTP user.
Username string `json:"username" validate:"required"`
// Password is the SFTP password (can be used as passphrase for private key).
Password string `json:"password"`
Expand Down
2 changes: 1 addition & 1 deletion config/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
package sftp

import (
"github.com/conduitio-labs/conduit-connector-sftp/destination"
source "github.com/conduitio-labs/conduit-connector-sftp/source"
sdk "github.com/conduitio/conduit-connector-sdk"
)

// Connector combines all constructors for each plugin in one struct.
var Connector = sdk.Connector{
NewSpecification: Specification,
NewDestination: destination.NewDestination,
NewSource: source.NewSource,
NewDestination: nil,
}
Loading

0 comments on commit f161082

Please sign in to comment.