Skip to content

Commit

Permalink
refactor: rename service account to credentials
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Feb 3, 2025
1 parent 5e3ed04 commit 3f2fbb4
Show file tree
Hide file tree
Showing 14 changed files with 31 additions and 33 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ It expects configuration from env variables. Or you can pass configuration from
```sh
./any2any --from=file --to=mc \
--env="FILE__PATH=./in.txt" \
--env="MC__SERVICE_ACCOUNT=svc_account" \
--env="MC__CREDENTIALS='{creds:1}'" \
--env="MC__DESTINATION_TABLE_ID=project.sample.table"
```

### Configuration convention
`<source/sink>__<config_name>`, example:
- FILE__PATH: Path to the input file.
- MC__SERVICE_ACCOUNT: Service account for MaxCompute.
- MC__CREDENTIALS: Credentials for MaxCompute.
- MC__DESTINATION_TABLE_ID: Destination table ID in MaxCompute.


Expand All @@ -53,7 +53,7 @@ You can use the JQ processor to filter or transform data before transferring it
```sh
./any2any --from=file --to=mc \
--env="FILE__PATH=./in.txt" \
--env="MC__SERVICE_ACCOUNT=svc_account" \
--env="MC__CREDENTIALS='{creds:1}'" \
--env="MC__DESTINATION_TABLE_ID=project.sample.table" \
--env="JQ__QUERY=.[] | select(.age > 30)"
```
Expand All @@ -65,7 +65,7 @@ It applies when sink and source are in the same environment. For example, transf
```sh
./any2any --from=oss --to=mc --no-pipeline \
--env="OSS2MC__SOURCE_BUCKET_PATH=bucket/path" \
--env="OSS2MC__SERVICE_ACCOUNT=svc_account" \
--env="OSS2MC__CREDENTIALS='{creds:1}'" \
--env="OSS2MC__DESTINATION_TABLE_ID=project.sample.table"
```

Expand All @@ -88,10 +88,10 @@ It applies when sink and source are in the same environment. For example, transf
| | GMAIL__EXTRACTOR_FILE_FORMAT | Which format of file to be extracted (csv, json) (default: csv) |
| | GMAIL__FILENAME_COLUMN | Column name to retain filename of downloaded file. "" for ignore (default: "__FILENAME__") |
| | GMAIL__COLUMN_MAPPING_FILE_PATH | Path to the mapping column for gmail record result. "" for ignore (default: "") |
| MC | MC__SERVICE_ACCOUNT | Service account for MaxCompute. |
| MC | MC__CREDENTIALS | Credentials for MaxCompute. |
| | MC__QUERY_FILE_PATH | Path to the query file. (default: /data/in/query.sql) |
| | MC__EXECUTION_PROJECT | Project ID for the query execution. |
| OSS | OSS__SERVICE_ACCOUNT | Service account for OSS. |
| OSS | OSS__CREDENTIALS | Credentials for OSS. |
| | OSS__SOURCE_BUCKET_PATH | The source path in a OSS bucket to read the files. Must include the OSS bucket name. |
| | OSS__FILE_FORMAT | File format availability: CSV, JSON. (default: JSON) |
| | OSS__CSV_DELIMITER | Delimiter for CSV file format. (default: ,) |
Expand All @@ -100,12 +100,12 @@ It applies when sink and source are in the same environment. For example, transf

| Component | Configuration | Description |
|---|---|---|
| MC | MC__SERVICE_ACCOUNT | Service account for MaxCompute. |
| MC | MC__CREDENTIALS | Credentials for MaxCompute. |
| | MC__DESTINATION_TABLE_ID | Destination table ID in Maxcompute. |
| | MC__LOAD_METHOD | Load method availability: APPEND, REPLACE. (default: APPEND) |
| | MC__UPLOAD_MODE | Upload mode availability: STREAM, REGULAR. (default: STREAM) |
| IO | - | - |
| OSS | OSS__SERVICE_ACCOUNT | Service account for OSS. |
| OSS | OSS__CREDENTIALS | Credentials for OSS. |
| | OSS__DESTINATION_BUCKET_PATH | The destination path in a OSS bucket to put the result files. Must include the OSS bucket name. |
| | OSS__GROUP_BY | Available option: BATCH, COLUMN. "" for ignore |
| | OSS__GROUP_BATCH_SIZE | Batch size for the group by BATCH. |
Expand Down Expand Up @@ -138,7 +138,7 @@ It applies when sink and source are in the same environment. For example, transf
## Supported Direct Execution For Data Transfer
| Component | Configuration | Description |
|---|---|---|
| OSS2MC | OSS2MC__SERVICE_ACCOUNT | Service account for MaxCompute. |
| OSS2MC | OSS2MC__CREDENTIALS | Credentials for MaxCompute. |
| | OSS2MC__SOURCE_BUCKET_PATH | The source path in a OSS bucket to read the files. Must include the OSS bucket name. |
| | OSS2MC__DESTINATION_TABLE_ID | Destination table ID in Maxcompute. |
| | OSS2MC__FILE_FORMAT | File format availability: CSV, JSON. (default: JSON) |
Expand Down
2 changes: 1 addition & 1 deletion ext/direct/oss2mc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type OSS2MC struct {
func NewOSS2MC(ctx context.Context, l *slog.Logger, cfg *config.OSS2MCConfig, opts ...option.Option) (flow.NoFlow, error) {

// create client for maxcompute
client, err := maxcompute.NewClient(cfg.ServiceAccount)
client, err := maxcompute.NewClient(cfg.Credentials)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
4 changes: 2 additions & 2 deletions ext/maxcompute/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func collectMaxComputeCredential(jsonData []byte) (*maxComputeCredentials, error
}

// NewClient creates a new MaxCompute client
func NewClient(svcAcc string) (*odps.Odps, error) {
cred, err := collectMaxComputeCredential([]byte(svcAcc))
func NewClient(rawCreds string) (*odps.Odps, error) {
cred, err := collectMaxComputeCredential([]byte(rawCreds))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
6 changes: 2 additions & 4 deletions ext/maxcompute/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ type MaxcomputeSink struct {
var _ flow.Sink = (*MaxcomputeSink)(nil)

// NewSink creates a new MaxcomputeSink
// svcAcc is the service account json string refer to maxComputeCredentials
// tableID is the table ID to write to, it must be in the format of project_name.schema_name.table_name
func NewSink(l *slog.Logger, svcAcc string, tableID string, loadMethod string, uploadMode string, opts ...option.Option) (*MaxcomputeSink, error) {
func NewSink(l *slog.Logger, creds string, tableID string, loadMethod string, uploadMode string, opts ...option.Option) (*MaxcomputeSink, error) {
// create commonSink sink
commonSink := sink.NewCommonSink(l, opts...)

// create client for maxcompute
client, err := NewClient(svcAcc)
client, err := NewClient(creds)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
4 changes: 2 additions & 2 deletions ext/maxcompute/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ type MaxcomputeSource struct {
var _ flow.Source = (*MaxcomputeSource)(nil)

// NewSource creates a new MaxcomputeSource.
func NewSource(l *slog.Logger, svcAcc string, queryFilePath string, executionProject string, opts ...option.Option) (*MaxcomputeSource, error) {
func NewSource(l *slog.Logger, creds string, queryFilePath string, executionProject string, opts ...option.Option) (*MaxcomputeSource, error) {
// create commonSource source
commonSource := source.NewCommonSource(l, opts...)

// create client for maxcompute
client, err := NewClient(svcAcc)
client, err := NewClient(creds)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
4 changes: 2 additions & 2 deletions ext/oss/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func parseOSSCredentials(data []byte) (*ossCredentials, error) {
return cred, nil
}

func NewOSSClient(svcAcc string) (*oss.Client, error) {
cred, err := parseOSSCredentials([]byte(svcAcc))
func NewOSSClient(rawCreds string) (*oss.Client, error) {
cred, err := parseOSSCredentials([]byte(rawCreds))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
4 changes: 2 additions & 2 deletions ext/oss/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var _ flow.Sink = (*OSSSink)(nil)

// NewSink creates a new OSSSink
func NewSink(ctx context.Context, l *slog.Logger,
svcAcc, destinationBucketPath string,
creds, destinationBucketPath string,
groupBy string, groupBatchSize int, groupColumnName string,
columnMappingFilePath string,
filenamePattern string, enableOverwrite bool,
Expand All @@ -51,7 +51,7 @@ func NewSink(ctx context.Context, l *slog.Logger,
commonSink := sink.NewCommonSink(l, opts...)

// create OSS client
client, err := NewOSSClient(svcAcc)
client, err := NewOSSClient(creds)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
4 changes: 2 additions & 2 deletions ext/oss/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ type OSSSource struct {
var _ flow.Source = (*OSSSource)(nil)

// NewSource creates a new OSSSource.
func NewSource(ctx context.Context, l *slog.Logger, svcAcc string,
func NewSource(ctx context.Context, l *slog.Logger, creds string,
sourceBucketPath, fileFormat string, csvDelimiter rune,
columnMappingFilePath string, opts ...option.Option) (*OSSSource, error) {
// create commonSource source
commonSource := source.NewCommonSource(l, opts...)

// create OSS client
client, err := NewOSSClient(svcAcc)
client, err := NewOSSClient(creds)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func GetSource(ctx context.Context, l *slog.Logger, source Type, cfg *config.Con
if err != nil {
return nil, errors.WithStack(err)
}
return maxcompute.NewSource(l, sourceCfg.ServiceAccount, sourceCfg.QueryFilePath, sourceCfg.ExecutionProject, opts...)
return maxcompute.NewSource(l, sourceCfg.Credentials, sourceCfg.QueryFilePath, sourceCfg.ExecutionProject, opts...)
case FILE:
sourceCfg, err := config.SourceFile(envs...)
if err != nil {
Expand Down Expand Up @@ -78,7 +78,7 @@ func GetSource(ctx context.Context, l *slog.Logger, source Type, cfg *config.Con
if err != nil {
return nil, errors.WithStack(err)
}
return oss.NewSource(ctx, l, sourceCfg.ServiceAccount, sourceCfg.SourceBucketPath, sourceCfg.FileFormat, sourceCfg.CSVDelimiter, sourceCfg.ColumnMappingFilePath, opts...)
return oss.NewSource(ctx, l, sourceCfg.Credentials, sourceCfg.SourceBucketPath, sourceCfg.FileFormat, sourceCfg.CSVDelimiter, sourceCfg.ColumnMappingFilePath, opts...)
case IO:
}
return nil, fmt.Errorf("source: unknown source: %s", source)
Expand All @@ -97,7 +97,7 @@ func GetSink(ctx context.Context, l *slog.Logger, sink Type, cfg *config.Config,
if err != nil {
return nil, errors.WithStack(err)
}
return maxcompute.NewSink(l, sinkCfg.ServiceAccount, sinkCfg.DestinationTableID, sinkCfg.LoadMethod, sinkCfg.UploadMode, opts...)
return maxcompute.NewSink(l, sinkCfg.Credentials, sinkCfg.DestinationTableID, sinkCfg.LoadMethod, sinkCfg.UploadMode, opts...)
case FILE:
case IO:
return io.NewSink(l), nil
Expand All @@ -106,7 +106,7 @@ func GetSink(ctx context.Context, l *slog.Logger, sink Type, cfg *config.Config,
if err != nil {
return nil, errors.WithStack(err)
}
return oss.NewSink(ctx, l, sinkCfg.ServiceAccount,
return oss.NewSink(ctx, l, sinkCfg.Credentials,
sinkCfg.DestinationBucketPath,
sinkCfg.GroupBy, sinkCfg.GroupBatchSize, sinkCfg.GroupColumnName,
sinkCfg.ColumnMappingFilePath,
Expand Down
2 changes: 1 addition & 1 deletion internal/config/direct_oss2mc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package config
type OSS2MCConfig struct {
SourceBucketPath string `env:"OSS2MC__SOURCE_BUCKET_PATH"`
FileFormat string `env:"OSS2MC__FILE_FORMAT" envDefault:"json"`
ServiceAccount string `env:"OSS2MC__SERVICE_ACCOUNT"`
Credentials string `env:"OSS2MC__CREDENTIALS"`
DestinationTableID string `env:"OSS2MC__DESTINATION_TABLE_ID"`
LoadMethod string `env:"OSS2MC__LOAD_METHOD" envDefault:"APPEND"`
PartitionValues []string `env:"OSS2MC__PARTITION_VALUES" envDefault:""`
Expand Down
2 changes: 1 addition & 1 deletion internal/config/sink_mc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package config

// SinkMCConfig is a configuration for the sink maxcompute component.
type SinkMCConfig struct {
ServiceAccount string `env:"MC__SERVICE_ACCOUNT"`
Credentials string `env:"MC__CREDENTIALS"`
DestinationTableID string `env:"MC__DESTINATION_TABLE_ID"`
LoadMethod string `env:"MC__LOAD_METHOD" envDefault:"APPEND"`
UploadMode string `env:"MC__UPLOAD_MODE" envDefault:"STREAM"`
Expand Down
2 changes: 1 addition & 1 deletion internal/config/sink_oss.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package config

type SinkOSSConfig struct {
ServiceAccount string `env:"OSS__SERVICE_ACCOUNT"`
Credentials string `env:"OSS__CREDENTIALS"`
DestinationBucketPath string `env:"OSS__DESTINATION_BUCKET_PATH"`
GroupBy string `env:"OSS__GROUP_BY"`
GroupBatchSize int `env:"OSS__GROUP_BATCH_SIZE" envDefault:"1000"`
Expand Down
2 changes: 1 addition & 1 deletion internal/config/source_mc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package config

// SourceMCConfig is a configuration for the source maxcompute component.
type SourceMCConfig struct {
ServiceAccount string `env:"MC__SERVICE_ACCOUNT"`
Credentials string `env:"MC__CREDENTIALS"`
QueryFilePath string `env:"MC__QUERY_FILE_PATH" envDefault:"/data/in/query.sql"`
ExecutionProject string `env:"MC__EXECUTION_PROJECT"`
}
Expand Down
2 changes: 1 addition & 1 deletion internal/config/source_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package config

// SourceOSSConfig is a configuration for the source oss component.
type SourceOSSConfig struct {
ServiceAccount string `env:"OSS__SERVICE_ACCOUNT"`
Credentials string `env:"OSS__CREDENTIALS"`
SourceBucketPath string `env:"OSS__SOURCE_BUCKET_PATH"`
FileFormat string `env:"OSS__FILE_FORMAT" envDefault:"json"`
CSVDelimiter rune `env:"OSS__CSV_DELIMITER" envDefault:","`
Expand Down

0 comments on commit 3f2fbb4

Please sign in to comment.