Skip to content

Commit

Permalink
Merge pull request #19 from goto/oss-source
Browse files Browse the repository at this point in the history
chore(oss-source): initial implementation oss source
  • Loading branch information
deryrahman authored Jan 30, 2025
2 parents 1cf747d + b95b3e9 commit b5b83f4
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 1 deletion.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ It expects configuration from env variables. Or you can pass configuration from
| MC | MC__SERVICE_ACCOUNT | Service account for MaxCompute. |
| | MC__QUERY_FILE_PATH | Path to the query file. (default: /data/in/query.sql) |
| | MC__EXECUTION_PROJECT | Project ID for the query execution. |

| OSS | OSS__SERVICE_ACCOUNT | Service account for OSS. |
| | OSS__SOURCE_BUCKET_PATH | The source path in a OSS bucket to read the files. Must include the OSS bucket name. |
| | OSS__FILE_FORMAT | File format availability: CSV, JSON. (default: JSON) |
| | OSS__CSV_DELIMITER | Delimiter for CSV file format. (default: ,) |
| | OSS__COLUMN_MAPPING_FILE_PATH | Path to the mapping column for the record result. "" for ignore (default: "") |
## Supported Sinks

| Component | Configuration | Description |
Expand Down
222 changes: 222 additions & 0 deletions ext/oss/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package oss

import (
"bufio"
"bytes"
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/url"
"strings"

"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
extcommon "github.com/goto/optimus-any2any/ext/common"
"github.com/goto/optimus-any2any/internal/component/option"
"github.com/goto/optimus-any2any/internal/component/source"
"github.com/goto/optimus-any2any/pkg/flow"
"github.com/pkg/errors"
)

// OSSSource is the source component for OSS.
type OSSSource struct {
*source.CommonSource

ctx context.Context
client *oss.Client
bucket string
pathPrefix string
fileFormat string
csvDelimiter rune
columnMap map[string]string
}

var _ flow.Source = (*OSSSource)(nil)

// NewSource creates a new OSSSource.
func NewSource(ctx context.Context, l *slog.Logger, svcAcc string,
sourceBucketPath, fileFormat string, csvDelimiter rune,
columnMappingFilePath string, opts ...option.Option) (*OSSSource, error) {
// create commonSource source
commonSource := source.NewCommonSource(l, opts...)

// create OSS client
client, err := NewOSSClient(svcAcc)
if err != nil {
return nil, errors.WithStack(err)
}

// parse source bucket path
parsedURL, err := url.Parse(fmt.Sprintf("oss://%s", sourceBucketPath))
if err != nil {
return nil, errors.WithStack(err)
}
// read column map
columnMap, err := extcommon.GetColumnMap(columnMappingFilePath)
if err != nil {
return nil, errors.WithStack(err)
}

ossSource := &OSSSource{
CommonSource: commonSource,
ctx: ctx,
client: client,
bucket: parsedURL.Host,
pathPrefix: strings.TrimPrefix(parsedURL.Path, "/"),
fileFormat: fileFormat,
csvDelimiter: csvDelimiter,
columnMap: columnMap,
}

// add clean function
commonSource.AddCleanFunc(func() {
commonSource.Logger.Debug("source(oss): cleaning up")
})

// register source process
commonSource.RegisterProcess(ossSource.process)

return ossSource, nil
}

func (o *OSSSource) process() {
// list objects
objectResult, err := o.client.ListObjects(o.ctx, &oss.ListObjectsRequest{
Bucket: oss.Ptr(o.bucket),
Prefix: oss.Ptr(o.pathPrefix),
})
if err != nil {
o.Logger.Error(fmt.Sprintf("source(oss): failed to list objects in bucket: %s", o.bucket))
o.SetError(errors.WithStack(err))
return
}
if len(objectResult.Contents) == 0 {
o.Logger.Info("sink(oss): no objects found")
return
}

// process objects
for _, objectProp := range objectResult.Contents {
// download object
object, err := o.client.GetObject(o.ctx, &oss.GetObjectRequest{
Bucket: oss.Ptr(o.bucket),
Key: objectProp.Key,
})
if err != nil {
o.Logger.Warn(fmt.Sprintf("source(oss): failed to download, skip the object: %s. error: %s", oss.ToString(objectProp.Key), err.Error()))
return
}

// unpack records
records, err := o.unpackRecords(object)
if err != nil {
o.Logger.Error(fmt.Sprintf("source(oss): failed to unpack records from object: %s", oss.ToString(objectProp.Key)))
o.SetError(errors.WithStack(err))
return
}

// send records
for _, record := range records {
mappedRecord := extcommon.KeyMapping(o.columnMap, record)
raw, err := json.Marshal(mappedRecord)
if err != nil {
o.Logger.Error(fmt.Sprintf("source(oss): failed to marshal record: %s", err.Error()))
o.SetError(errors.WithStack(err))
continue
}
o.Send(raw)
}
}
}

func (o *OSSSource) unpackRecords(object *oss.GetObjectResult) ([]map[string]interface{}, error) {
// unmarshal object based on file format
var (
records []map[string]interface{}
err error
)
switch strings.ToLower(o.fileFormat) {
case "csv":
records, err = o.unmarshalCSV(object)
case "json":
records, err = o.unmarshalJSON(object)
default:
err = errors.Errorf("unsupported file format: %s", o.fileFormat)
}
if err != nil {
return nil, errors.WithStack(err)
}
return records, nil
}

func (o *OSSSource) unmarshalCSV(object *oss.GetObjectResult) ([]map[string]interface{}, error) {
// read object content
raw, err := io.ReadAll(object.Body)
if err != nil {
return nil, errors.WithStack(err)
}

// unmarshal CSV
reader := csv.NewReader(bytes.NewReader(raw))

// set delimiter
if o.csvDelimiter != 0 {
reader.Comma = o.csvDelimiter
}

// read all records
rows, err := reader.ReadAll()
if err != nil {
return nil, errors.WithStack(err)
}

// convert rows to records
records := make([]map[string]interface{}, 0, len(rows)-1)
headers := rows[0]
for _, row := range rows[1:] {
record := make(map[string]interface{}, len(headers))
for i, header := range headers {
record[header] = row[i]
}
records = append(records, record)
}

return records, nil
}

func (o *OSSSource) unmarshalJSON(object *oss.GetObjectResult) ([]map[string]interface{}, error) {
// read object content
raw, err := io.ReadAll(object.Body)
if err != nil {
return nil, errors.WithStack(err)
}

// read records
records := make([]map[string]interface{}, 0)
reader := bufio.NewReader(bytes.NewReader(raw))
for {
// read line
raw, err := reader.ReadBytes('\n')
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, errors.WithStack(err)
}

// remove trailing newline
raw = bytes.TrimSuffix(raw, []byte("\n"))

// unmarshal JSON
var record map[string]interface{}
if err := json.Unmarshal(raw, &record); err != nil {
return nil, errors.WithStack(err)
}

records = append(records, record)
}

return records, nil
}
6 changes: 6 additions & 0 deletions internal/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ func GetSource(ctx context.Context, l *slog.Logger, source Type, cfg *config.Con
return gmail.NewSource(ctx, l, sourceCfg.Token, sourceCfg.Filter,
sourceCfg.ExtractorSource, sourceCfg.ExtractorPattern, sourceCfg.ExtractorFileFormat,
sourceCfg.FilenameColumn, sourceCfg.ColumnMappingFilePath, opts...)
case OSS:
sourceCfg, err := config.SourceOSS(envs...)
if err != nil {
return nil, errors.WithStack(err)
}
return oss.NewSource(ctx, l, sourceCfg.ServiceAccount, sourceCfg.SourceBucketPath, sourceCfg.FileFormat, sourceCfg.CSVDelimiter, sourceCfg.ColumnMappingFilePath, opts...)
case IO:
}
return nil, fmt.Errorf("source: unknown source: %s", source)
Expand Down
15 changes: 15 additions & 0 deletions internal/config/source_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package config

// SourceOSSConfig is a configuration for the source oss component.
type SourceOSSConfig struct {
ServiceAccount string `env:"OSS__SERVICE_ACCOUNT"`
SourceBucketPath string `env:"OSS__SOURCE_BUCKET_PATH"`
FileFormat string `env:"OSS__FILE_FORMAT" envDefault:"json"`
CSVDelimiter rune `env:"OSS__CSV_DELIMITER" envDefault:","`
ColumnMappingFilePath string `env:"OSS__COLUMN_MAPPING_FILE_PATH"`
}

// SourceOSS parses the environment variables and returns the source oss configuration.
func SourceOSS(envs ...string) (*SourceOSSConfig, error) {
return parse[SourceOSSConfig](envs...)
}

0 comments on commit b5b83f4

Please sign in to comment.