Skip to content

Commit

Permalink
feat: mapping column + readme for oss as source
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Jan 30, 2025
1 parent bc949c5 commit b95b3e9
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 15 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ It expects configuration from env variables. Or you can pass configuration from
| MC | MC__SERVICE_ACCOUNT | Service account for MaxCompute. |
| | MC__QUERY_FILE_PATH | Path to the query file. (default: /data/in/query.sql) |
| | MC__EXECUTION_PROJECT | Project ID for the query execution. |

| OSS | OSS__SERVICE_ACCOUNT | Service account for OSS. |
| | OSS__SOURCE_BUCKET_PATH | The source path in a OSS bucket to read the files. Must include the OSS bucket name. |
| | OSS__FILE_FORMAT | File format availability: CSV, JSON. (default: JSON) |
| | OSS__CSV_DELIMITER | Delimiter for CSV file format. (default: ,) |
| | OSS__COLUMN_MAPPING_FILE_PATH | Path to the mapping column for the record result. "" for ignore (default: "") |
## Supported Sinks

| Component | Configuration | Description |
Expand Down
34 changes: 20 additions & 14 deletions ext/oss/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"

"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
extcommon "github.com/goto/optimus-any2any/ext/common"
"github.com/goto/optimus-any2any/internal/component/option"
"github.com/goto/optimus-any2any/internal/component/source"
"github.com/goto/optimus-any2any/pkg/flow"
Expand All @@ -29,13 +30,15 @@ type OSSSource struct {
pathPrefix string
fileFormat string
csvDelimiter rune
columnMap map[string]string
}

var _ flow.Source = (*OSSSource)(nil)

// NewSource creates a new OSSSource.
func NewSource(ctx context.Context, l *slog.Logger, svcAcc string,
sourceBucketPath, fileFormat string, csvDelimiter rune, opts ...option.Option) (*OSSSource, error) {
sourceBucketPath, fileFormat string, csvDelimiter rune,
columnMappingFilePath string, opts ...option.Option) (*OSSSource, error) {
// create commonSource source
commonSource := source.NewCommonSource(l, opts...)

Expand All @@ -50,6 +53,11 @@ func NewSource(ctx context.Context, l *slog.Logger, svcAcc string,
if err != nil {
return nil, errors.WithStack(err)
}
// read column map
columnMap, err := extcommon.GetColumnMap(columnMappingFilePath)
if err != nil {
return nil, errors.WithStack(err)
}

ossSource := &OSSSource{
CommonSource: commonSource,
Expand All @@ -59,6 +67,7 @@ func NewSource(ctx context.Context, l *slog.Logger, svcAcc string,
pathPrefix: strings.TrimPrefix(parsedURL.Path, "/"),
fileFormat: fileFormat,
csvDelimiter: csvDelimiter,
columnMap: columnMap,
}

// add clean function
Expand Down Expand Up @@ -110,12 +119,19 @@ func (o *OSSSource) process() {

// send records
for _, record := range records {
o.Send(record)
mappedRecord := extcommon.KeyMapping(o.columnMap, record)
raw, err := json.Marshal(mappedRecord)
if err != nil {
o.Logger.Error(fmt.Sprintf("source(oss): failed to marshal record: %s", err.Error()))
o.SetError(errors.WithStack(err))
continue
}
o.Send(raw)
}
}
}

func (o *OSSSource) unpackRecords(object *oss.GetObjectResult) ([][]byte, error) {
func (o *OSSSource) unpackRecords(object *oss.GetObjectResult) ([]map[string]interface{}, error) {
// unmarshal object based on file format
var (
records []map[string]interface{}
Expand All @@ -132,17 +148,7 @@ func (o *OSSSource) unpackRecords(object *oss.GetObjectResult) ([][]byte, error)
if err != nil {
return nil, errors.WithStack(err)
}

// marshal records
raws := make([][]byte, 0, len(records))
for _, record := range records {
raw, err := json.Marshal(record)
if err != nil {
return nil, errors.WithStack(err)
}
raws = append(raws, raw)
}
return raws, nil
return records, nil
}

func (o *OSSSource) unmarshalCSV(object *oss.GetObjectResult) ([]map[string]interface{}, error) {
Expand Down
6 changes: 6 additions & 0 deletions internal/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ func GetSource(ctx context.Context, l *slog.Logger, source Type, cfg *config.Con
return gmail.NewSource(ctx, l, sourceCfg.Token, sourceCfg.Filter,
sourceCfg.ExtractorSource, sourceCfg.ExtractorPattern, sourceCfg.ExtractorFileFormat,
sourceCfg.FilenameColumn, sourceCfg.ColumnMappingFilePath, opts...)
case OSS:
sourceCfg, err := config.SourceOSS(envs...)
if err != nil {
return nil, errors.WithStack(err)
}
return oss.NewSource(ctx, l, sourceCfg.ServiceAccount, sourceCfg.SourceBucketPath, sourceCfg.FileFormat, sourceCfg.CSVDelimiter, sourceCfg.ColumnMappingFilePath, opts...)
case IO:
}
return nil, fmt.Errorf("source: unknown source: %s", source)
Expand Down
15 changes: 15 additions & 0 deletions internal/config/source_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package config

// SourceOSSConfig is a configuration for the source oss component.
type SourceOSSConfig struct {
ServiceAccount string `env:"OSS__SERVICE_ACCOUNT"`
SourceBucketPath string `env:"OSS__SOURCE_BUCKET_PATH"`
FileFormat string `env:"OSS__FILE_FORMAT" envDefault:"json"`
CSVDelimiter rune `env:"OSS__CSV_DELIMITER" envDefault:","`
ColumnMappingFilePath string `env:"OSS__COLUMN_MAPPING_FILE_PATH"`
}

// SourceOSS parses the environment variables and returns the source oss configuration.
func SourceOSS(envs ...string) (*SourceOSSConfig, error) {
return parse[SourceOSSConfig](envs...)
}

0 comments on commit b95b3e9

Please sign in to comment.