From 3f2fbb492259755eb0b20c59b90b28872a22e402 Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 3 Feb 2025 14:31:59 +0700 Subject: [PATCH] refactor: rename service account to credentials --- README.md | 18 +++++++++--------- ext/direct/oss2mc.go | 2 +- ext/maxcompute/client.go | 4 ++-- ext/maxcompute/sink.go | 6 ++---- ext/maxcompute/source.go | 4 ++-- ext/oss/client.go | 4 ++-- ext/oss/sink.go | 4 ++-- ext/oss/source.go | 4 ++-- internal/component/component.go | 8 ++++---- internal/config/direct_oss2mc.go | 2 +- internal/config/sink_mc.go | 2 +- internal/config/sink_oss.go | 2 +- internal/config/source_mc.go | 2 +- internal/config/source_oss.go | 2 +- 14 files changed, 31 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index b8e8354..252a0b8 100644 --- a/README.md +++ b/README.md @@ -34,14 +34,14 @@ It expects configuration from env variables. Or you can pass configuration from ```sh ./any2any --from=file --to=mc \ --env="FILE__PATH=./in.txt" \ ---env="MC__SERVICE_ACCOUNT=svc_account" \ +--env="MC__CREDENTIALS='{creds:1}'" \ --env="MC__DESTINATION_TABLE_ID=project.sample.table" ``` ### Configuration convention `__`, example: - FILE__PATH: Path to the input file. -- MC__SERVICE_ACCOUNT: Service account for MaxCompute. +- MC__CREDENTIALS: Credentials for MaxCompute. - MC__DESTINATION_TABLE_ID: Destination table ID in MaxCompute. @@ -53,7 +53,7 @@ You can use the JQ processor to filter or transform data before transferring it ```sh ./any2any --from=file --to=mc \ --env="FILE__PATH=./in.txt" \ ---env="MC__SERVICE_ACCOUNT=svc_account" \ +--env="MC__CREDENTIALS='{creds:1}'" \ --env="MC__DESTINATION_TABLE_ID=project.sample.table" \ --env="JQ__QUERY=.[] | select(.age > 30)" ``` @@ -65,7 +65,7 @@ It applies when sink and source are in the same environment. For example, transf ```sh ./any2any --from=oss --to=mc --no-pipeline \ --env="OSS2MC__SOURCE_BUCKET_PATH=bucket/path" \ ---env="OSS2MC__SERVICE_ACCOUNT=svc_account" \ +--env="OSS2MC__CREDENTIALS='{creds:1}'" \ --env="OSS2MC__DESTINATION_TABLE_ID=project.sample.table" ``` @@ -88,10 +88,10 @@ It applies when sink and source are in the same environment. For example, transf | | GMAIL__EXTRACTOR_FILE_FORMAT | Which format of file to be extracted (csv, json) (default: csv) | | | GMAIL__FILENAME_COLUMN | Column name to retain filename of downloaded file. "" for ignore (default: "__FILENAME__") | | | GMAIL__COLUMN_MAPPING_FILE_PATH | Path to the mapping column for gmail record result. "" for ignore (default: "") | -| MC | MC__SERVICE_ACCOUNT | Service account for MaxCompute. | +| MC | MC__CREDENTIALS | Credentials for MaxCompute. | | | MC__QUERY_FILE_PATH | Path to the query file. (default: /data/in/query.sql) | | | MC__EXECUTION_PROJECT | Project ID for the query execution. | -| OSS | OSS__SERVICE_ACCOUNT | Service account for OSS. | +| OSS | OSS__CREDENTIALS | Credentials for OSS. | | | OSS__SOURCE_BUCKET_PATH | The source path in a OSS bucket to read the files. Must include the OSS bucket name. | | | OSS__FILE_FORMAT | File format availability: CSV, JSON. (default: JSON) | | | OSS__CSV_DELIMITER | Delimiter for CSV file format. (default: ,) | @@ -100,12 +100,12 @@ It applies when sink and source are in the same environment. For example, transf | Component | Configuration | Description | |---|---|---| -| MC | MC__SERVICE_ACCOUNT | Service account for MaxCompute. | +| MC | MC__CREDENTIALS | Credentials for MaxCompute. | | | MC__DESTINATION_TABLE_ID | Destination table ID in Maxcompute. | | | MC__LOAD_METHOD | Load method availability: APPEND, REPLACE. (default: APPEND) | | | MC__UPLOAD_MODE | Upload mode availability: STREAM, REGULAR. (default: STREAM) | | IO | - | - | -| OSS | OSS__SERVICE_ACCOUNT | Service account for OSS. | +| OSS | OSS__CREDENTIALS | Credentials for OSS. | | | OSS__DESTINATION_BUCKET_PATH | The destination path in a OSS bucket to put the result files. Must include the OSS bucket name. | | | OSS__GROUP_BY | Available option: BATCH, COLUMN. "" for ignore | | | OSS__GROUP_BATCH_SIZE | Batch size for the group by BATCH. | @@ -138,7 +138,7 @@ It applies when sink and source are in the same environment. For example, transf ## Supported Direct Execution For Data Transfer | Component | Configuration | Description | |---|---|---| -| OSS2MC | OSS2MC__SERVICE_ACCOUNT | Service account for MaxCompute. | +| OSS2MC | OSS2MC__CREDENTIALS | Credentials for MaxCompute. | | | OSS2MC__SOURCE_BUCKET_PATH | The source path in a OSS bucket to read the files. Must include the OSS bucket name. | | | OSS2MC__DESTINATION_TABLE_ID | Destination table ID in Maxcompute. | | | OSS2MC__FILE_FORMAT | File format availability: CSV, JSON. (default: JSON) | diff --git a/ext/direct/oss2mc.go b/ext/direct/oss2mc.go index 8d2824b..0507e04 100644 --- a/ext/direct/oss2mc.go +++ b/ext/direct/oss2mc.go @@ -30,7 +30,7 @@ type OSS2MC struct { func NewOSS2MC(ctx context.Context, l *slog.Logger, cfg *config.OSS2MCConfig, opts ...option.Option) (flow.NoFlow, error) { // create client for maxcompute - client, err := maxcompute.NewClient(cfg.ServiceAccount) + client, err := maxcompute.NewClient(cfg.Credentials) if err != nil { return nil, errors.WithStack(err) } diff --git a/ext/maxcompute/client.go b/ext/maxcompute/client.go index b9f641f..3c6d8d7 100644 --- a/ext/maxcompute/client.go +++ b/ext/maxcompute/client.go @@ -24,8 +24,8 @@ func collectMaxComputeCredential(jsonData []byte) (*maxComputeCredentials, error } // NewClient creates a new MaxCompute client -func NewClient(svcAcc string) (*odps.Odps, error) { - cred, err := collectMaxComputeCredential([]byte(svcAcc)) +func NewClient(rawCreds string) (*odps.Odps, error) { + cred, err := collectMaxComputeCredential([]byte(rawCreds)) if err != nil { return nil, errors.WithStack(err) } diff --git a/ext/maxcompute/sink.go b/ext/maxcompute/sink.go index d8f8e63..efd662e 100644 --- a/ext/maxcompute/sink.go +++ b/ext/maxcompute/sink.go @@ -40,14 +40,12 @@ type MaxcomputeSink struct { var _ flow.Sink = (*MaxcomputeSink)(nil) // NewSink creates a new MaxcomputeSink -// svcAcc is the service account json string refer to maxComputeCredentials -// tableID is the table ID to write to, it must be in the format of project_name.schema_name.table_name -func NewSink(l *slog.Logger, svcAcc string, tableID string, loadMethod string, uploadMode string, opts ...option.Option) (*MaxcomputeSink, error) { +func NewSink(l *slog.Logger, creds string, tableID string, loadMethod string, uploadMode string, opts ...option.Option) (*MaxcomputeSink, error) { // create commonSink sink commonSink := sink.NewCommonSink(l, opts...) // create client for maxcompute - client, err := NewClient(svcAcc) + client, err := NewClient(creds) if err != nil { return nil, errors.WithStack(err) } diff --git a/ext/maxcompute/source.go b/ext/maxcompute/source.go index 85cce59..cf1560d 100644 --- a/ext/maxcompute/source.go +++ b/ext/maxcompute/source.go @@ -27,12 +27,12 @@ type MaxcomputeSource struct { var _ flow.Source = (*MaxcomputeSource)(nil) // NewSource creates a new MaxcomputeSource. -func NewSource(l *slog.Logger, svcAcc string, queryFilePath string, executionProject string, opts ...option.Option) (*MaxcomputeSource, error) { +func NewSource(l *slog.Logger, creds string, queryFilePath string, executionProject string, opts ...option.Option) (*MaxcomputeSource, error) { // create commonSource source commonSource := source.NewCommonSource(l, opts...) // create client for maxcompute - client, err := NewClient(svcAcc) + client, err := NewClient(creds) if err != nil { return nil, errors.WithStack(err) } diff --git a/ext/oss/client.go b/ext/oss/client.go index 1550b51..09cf331 100644 --- a/ext/oss/client.go +++ b/ext/oss/client.go @@ -25,8 +25,8 @@ func parseOSSCredentials(data []byte) (*ossCredentials, error) { return cred, nil } -func NewOSSClient(svcAcc string) (*oss.Client, error) { - cred, err := parseOSSCredentials([]byte(svcAcc)) +func NewOSSClient(rawCreds string) (*oss.Client, error) { + cred, err := parseOSSCredentials([]byte(rawCreds)) if err != nil { return nil, errors.WithStack(err) } diff --git a/ext/oss/sink.go b/ext/oss/sink.go index 09b362f..51c95fb 100644 --- a/ext/oss/sink.go +++ b/ext/oss/sink.go @@ -41,7 +41,7 @@ var _ flow.Sink = (*OSSSink)(nil) // NewSink creates a new OSSSink func NewSink(ctx context.Context, l *slog.Logger, - svcAcc, destinationBucketPath string, + creds, destinationBucketPath string, groupBy string, groupBatchSize int, groupColumnName string, columnMappingFilePath string, filenamePattern string, enableOverwrite bool, @@ -51,7 +51,7 @@ func NewSink(ctx context.Context, l *slog.Logger, commonSink := sink.NewCommonSink(l, opts...) // create OSS client - client, err := NewOSSClient(svcAcc) + client, err := NewOSSClient(creds) if err != nil { return nil, errors.WithStack(err) } diff --git a/ext/oss/source.go b/ext/oss/source.go index 9b090e9..e7dcdd6 100644 --- a/ext/oss/source.go +++ b/ext/oss/source.go @@ -36,14 +36,14 @@ type OSSSource struct { var _ flow.Source = (*OSSSource)(nil) // NewSource creates a new OSSSource. -func NewSource(ctx context.Context, l *slog.Logger, svcAcc string, +func NewSource(ctx context.Context, l *slog.Logger, creds string, sourceBucketPath, fileFormat string, csvDelimiter rune, columnMappingFilePath string, opts ...option.Option) (*OSSSource, error) { // create commonSource source commonSource := source.NewCommonSource(l, opts...) // create OSS client - client, err := NewOSSClient(svcAcc) + client, err := NewOSSClient(creds) if err != nil { return nil, errors.WithStack(err) } diff --git a/internal/component/component.go b/internal/component/component.go index 1eeffea..3ee7f1b 100644 --- a/internal/component/component.go +++ b/internal/component/component.go @@ -50,7 +50,7 @@ func GetSource(ctx context.Context, l *slog.Logger, source Type, cfg *config.Con if err != nil { return nil, errors.WithStack(err) } - return maxcompute.NewSource(l, sourceCfg.ServiceAccount, sourceCfg.QueryFilePath, sourceCfg.ExecutionProject, opts...) + return maxcompute.NewSource(l, sourceCfg.Credentials, sourceCfg.QueryFilePath, sourceCfg.ExecutionProject, opts...) case FILE: sourceCfg, err := config.SourceFile(envs...) if err != nil { @@ -78,7 +78,7 @@ func GetSource(ctx context.Context, l *slog.Logger, source Type, cfg *config.Con if err != nil { return nil, errors.WithStack(err) } - return oss.NewSource(ctx, l, sourceCfg.ServiceAccount, sourceCfg.SourceBucketPath, sourceCfg.FileFormat, sourceCfg.CSVDelimiter, sourceCfg.ColumnMappingFilePath, opts...) + return oss.NewSource(ctx, l, sourceCfg.Credentials, sourceCfg.SourceBucketPath, sourceCfg.FileFormat, sourceCfg.CSVDelimiter, sourceCfg.ColumnMappingFilePath, opts...) case IO: } return nil, fmt.Errorf("source: unknown source: %s", source) @@ -97,7 +97,7 @@ func GetSink(ctx context.Context, l *slog.Logger, sink Type, cfg *config.Config, if err != nil { return nil, errors.WithStack(err) } - return maxcompute.NewSink(l, sinkCfg.ServiceAccount, sinkCfg.DestinationTableID, sinkCfg.LoadMethod, sinkCfg.UploadMode, opts...) + return maxcompute.NewSink(l, sinkCfg.Credentials, sinkCfg.DestinationTableID, sinkCfg.LoadMethod, sinkCfg.UploadMode, opts...) case FILE: case IO: return io.NewSink(l), nil @@ -106,7 +106,7 @@ func GetSink(ctx context.Context, l *slog.Logger, sink Type, cfg *config.Config, if err != nil { return nil, errors.WithStack(err) } - return oss.NewSink(ctx, l, sinkCfg.ServiceAccount, + return oss.NewSink(ctx, l, sinkCfg.Credentials, sinkCfg.DestinationBucketPath, sinkCfg.GroupBy, sinkCfg.GroupBatchSize, sinkCfg.GroupColumnName, sinkCfg.ColumnMappingFilePath, diff --git a/internal/config/direct_oss2mc.go b/internal/config/direct_oss2mc.go index 89e7962..8db8a4a 100644 --- a/internal/config/direct_oss2mc.go +++ b/internal/config/direct_oss2mc.go @@ -4,7 +4,7 @@ package config type OSS2MCConfig struct { SourceBucketPath string `env:"OSS2MC__SOURCE_BUCKET_PATH"` FileFormat string `env:"OSS2MC__FILE_FORMAT" envDefault:"json"` - ServiceAccount string `env:"OSS2MC__SERVICE_ACCOUNT"` + Credentials string `env:"OSS2MC__CREDENTIALS"` DestinationTableID string `env:"OSS2MC__DESTINATION_TABLE_ID"` LoadMethod string `env:"OSS2MC__LOAD_METHOD" envDefault:"APPEND"` PartitionValues []string `env:"OSS2MC__PARTITION_VALUES" envDefault:""` diff --git a/internal/config/sink_mc.go b/internal/config/sink_mc.go index 5f3be4a..7406f9e 100644 --- a/internal/config/sink_mc.go +++ b/internal/config/sink_mc.go @@ -2,7 +2,7 @@ package config // SinkMCConfig is a configuration for the sink maxcompute component. type SinkMCConfig struct { - ServiceAccount string `env:"MC__SERVICE_ACCOUNT"` + Credentials string `env:"MC__CREDENTIALS"` DestinationTableID string `env:"MC__DESTINATION_TABLE_ID"` LoadMethod string `env:"MC__LOAD_METHOD" envDefault:"APPEND"` UploadMode string `env:"MC__UPLOAD_MODE" envDefault:"STREAM"` diff --git a/internal/config/sink_oss.go b/internal/config/sink_oss.go index 23ad442..bdff267 100644 --- a/internal/config/sink_oss.go +++ b/internal/config/sink_oss.go @@ -1,7 +1,7 @@ package config type SinkOSSConfig struct { - ServiceAccount string `env:"OSS__SERVICE_ACCOUNT"` + Credentials string `env:"OSS__CREDENTIALS"` DestinationBucketPath string `env:"OSS__DESTINATION_BUCKET_PATH"` GroupBy string `env:"OSS__GROUP_BY"` GroupBatchSize int `env:"OSS__GROUP_BATCH_SIZE" envDefault:"1000"` diff --git a/internal/config/source_mc.go b/internal/config/source_mc.go index 45659b6..f133595 100644 --- a/internal/config/source_mc.go +++ b/internal/config/source_mc.go @@ -2,7 +2,7 @@ package config // SourceMCConfig is a configuration for the source maxcompute component. type SourceMCConfig struct { - ServiceAccount string `env:"MC__SERVICE_ACCOUNT"` + Credentials string `env:"MC__CREDENTIALS"` QueryFilePath string `env:"MC__QUERY_FILE_PATH" envDefault:"/data/in/query.sql"` ExecutionProject string `env:"MC__EXECUTION_PROJECT"` } diff --git a/internal/config/source_oss.go b/internal/config/source_oss.go index 086b3a4..dff5d18 100644 --- a/internal/config/source_oss.go +++ b/internal/config/source_oss.go @@ -2,7 +2,7 @@ package config // SourceOSSConfig is a configuration for the source oss component. type SourceOSSConfig struct { - ServiceAccount string `env:"OSS__SERVICE_ACCOUNT"` + Credentials string `env:"OSS__CREDENTIALS"` SourceBucketPath string `env:"OSS__SOURCE_BUCKET_PATH"` FileFormat string `env:"OSS__FILE_FORMAT" envDefault:"json"` CSVDelimiter rune `env:"OSS__CSV_DELIMITER" envDefault:","`