Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Dagger Parquet Source feature using Ali OSS Service #49

Merged
merged 16 commits into from
Jan 23, 2025
Merged

Conversation

rajuGT
Copy link

@rajuGT rajuGT commented Nov 3, 2024

No description provided.

Raju G T and others added 15 commits October 9, 2024 18:36
…t(cosn) storage services

Given the configuration provided correctly. Set the below environment
variables accordingly to access the files stored in the respective
bucket.

Ali(oss)
- OSS_ACCESS_KEY_ID
- OSS_ACCESS_KEY_SECRET

Tencent(cos)
- COS_SECRET_ID
- COS_SECRET_KEY
- COS_REGION
If you need to use COS filesystem for the dagger, provide the cos
bucket/key configuration in the state.backend.fs.checkpointdir,
state.savepoints.dir, high-availability.storageDir to flinkdeployment
manifest.

If the filesystem protocol begins with cosn for the above
configurations, dagger uses the below configurations provided in the
flinkdeployment manifest file.

    fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
    fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
    fs.cosn.userinfo.secretId: <secretID>
    fs.cosn.userinfo.secretKey: <secretKey>
    fs.cosn.bucket.region: <region>
    fs.cosn.bucket.endpoint_suffix: <tencent-provided-prefix.xyz.com>
Most of the client implementation including GCS, is not serializable, so
fixed this issue by making client implementation not part of the
serialization, and when the client is passed over wire and the client
doesn't exist, it initializes as and when it is required.

        // In a distributed system, we don't intend the client to be serialized and most of the implementations like
        // GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error
        // Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f
        // is not serializable. The object probably contains or references non serializable fields.
        // Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage
Base automatically changed from dart-bug-fix to main January 23, 2025 08:52
 Conflicts:
	dagger-common/build.gradle
	dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProvider.java
@rajuGT rajuGT merged commit a9e00e8 into main Jan 23, 2025
3 checks passed
@rajuGT rajuGT deleted the parquet-oss branch January 23, 2025 09:03
rajuGT pushed a commit that referenced this pull request Jan 23, 2025
--

    Enable Dagger Parquet Source feature using Ali OSS Service (#49)

    * Add gradle tasks to minimal and dependencies to maven local

    * Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services
    Given the configuration provided correctly. Set the below environment
    variables accordingly to access the files stored in the respective
    bucket.

    Ali(oss)
    - OSS_ACCESS_KEY_ID
    - OSS_ACCESS_KEY_SECRET

    Tencent(cos)
    - COS_SECRET_ID
    - COS_SECRET_KEY
    - COS_REGION

    * OSS client endpoint should be configurable via ENV variable

    * COS filesystem high availability support
    If you need to use COS filesystem for the dagger, provide the cos
    bucket/key configuration in the state.backend.fs.checkpointdir,
    state.savepoints.dir, high-availability.storageDir to flinkdeployment
    manifest.

    If the filesystem protocol begins with cosn for the above
    configurations, dagger uses the below configurations provided in the
    flinkdeployment manifest file.

        fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
        fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
        fs.cosn.userinfo.secretId: <secretID>
        fs.cosn.userinfo.secretKey: <secretKey>
        fs.cosn.bucket.region: <region>
        fs.cosn.bucket.endpoint_suffix: <tencent-provided-prefix.xyz.com>

    * Fix checkstyle and made constants as static variables

    * Refactor Dart Feature to plug other object storage service providers

    * test checkstyle fix

    * Dart Support for OSS Service Provider

    * fix checkstyle

    * Dart Support for COS Service Provider

    * Dart implementation fix - the object storage client aren't serializable
    Most of the client implementation including GCS, is not serializable, so
    fixed this issue by making client implementation not part of the
    serialization, and when the client is passed over wire and the client
    doesn't exist, it initializes as and when it is required.

            // In a distributed system, we don't intend the client to be serialized and most of the implementations like
            // GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error
            // Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f
            // is not serializable. The object probably contains or references non serializable fields.
            // Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage

    * checkstyle fix

    * Add unit tests for DartDataStoreClientProvider

    * Enable Dagger Parquet Source feature using Ali OSS Service
rajuGT added a commit that referenced this pull request Jan 23, 2025
--

    Enable Dagger Parquet Source feature using Ali OSS Service (#49)

    * Add gradle tasks to minimal and dependencies to maven local

    * Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services
    Given the configuration provided correctly. Set the below environment
    variables accordingly to access the files stored in the respective
    bucket.

    Ali(oss)
    - OSS_ACCESS_KEY_ID
    - OSS_ACCESS_KEY_SECRET

    Tencent(cos)
    - COS_SECRET_ID
    - COS_SECRET_KEY
    - COS_REGION

    * OSS client endpoint should be configurable via ENV variable

    * COS filesystem high availability support
    If you need to use COS filesystem for the dagger, provide the cos
    bucket/key configuration in the state.backend.fs.checkpointdir,
    state.savepoints.dir, high-availability.storageDir to flinkdeployment
    manifest.

    If the filesystem protocol begins with cosn for the above
    configurations, dagger uses the below configurations provided in the
    flinkdeployment manifest file.

        fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
        fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
        fs.cosn.userinfo.secretId: <secretID>
        fs.cosn.userinfo.secretKey: <secretKey>
        fs.cosn.bucket.region: <region>
        fs.cosn.bucket.endpoint_suffix: <tencent-provided-prefix.xyz.com>

    * Fix checkstyle and made constants as static variables

    * Refactor Dart Feature to plug other object storage service providers

    * test checkstyle fix

    * Dart Support for OSS Service Provider

    * fix checkstyle

    * Dart Support for COS Service Provider

    * Dart implementation fix - the object storage client aren't serializable
    Most of the client implementation including GCS, is not serializable, so
    fixed this issue by making client implementation not part of the
    serialization, and when the client is passed over wire and the client
    doesn't exist, it initializes as and when it is required.

            // In a distributed system, we don't intend the client to be serialized and most of the implementations like
            // GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error
            // Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f
            // is not serializable. The object probably contains or references non serializable fields.
            // Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage

    * checkstyle fix

    * Add unit tests for DartDataStoreClientProvider

    * Enable Dagger Parquet Source feature using Ali OSS Service

Co-authored-by: rajuGT <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants