Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling errors in AmazonS3MoveCleanUpPolicy configuration #694

Open
Teju2404 opened this issue Jan 22, 2025 · 3 comments
Open

Handling errors in AmazonS3MoveCleanUpPolicy configuration #694

Teju2404 opened this issue Jan 22, 2025 · 3 comments
Labels
question Further information is requested

Comments

@Teju2404
Copy link

Hi,

Does AmazonS3MoveCleanUpPolicy("fs. cleanup.policy-move.failure.aws.bucket.name": "”) configuration support storing the entire file to the error path when issue occur or does it store only the problematic records?

"name": "s3_file_pulse_connector",
"config": {
"connector class": "io. streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"topic": "",
"tasks.max": "1",
"tasks. reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3ROwFileInputReader",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing",
"aws. s3. bucket. name": "
"aws. s3. bucket-prefix": "”
"aws. s3.region": "",
"fs. cleanup policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy",
"fs. cleanup-policy.move. success.aws.bucket.name": "”
"fs.cleanup.policy.move.success.aws.prefix.path": "”
"fs. cleanup.policy-move.failure.aws.bucket.name": "”
"fs.cleanup.policy.move.failure.aws.prefix.path": “",
"tasks. file.status.storage.bootstrap.servers": “”
"tasks. file.status.storage.topic":
"tasks. file.status.storage.topic.partitions":10,
"tasks. file.status.storage.topic.replication. factor":1,
"errors. log. include messages": "true",
"errors. log enable": "true",
"key, converter": "org. apache. kafka.connect. storage. StringConverter",
"value. converter": "org,apache.kafka.connect.storage.StringConverter"
}

@Teju2404 Teju2404 added the question Further information is requested label Jan 22, 2025
@fhussonnois
Copy link
Member

Hi @Teju2404, the cleanup policy is used to move the entire file when an error occured during processing.

@Teju2404
Copy link
Author

Hi @fhussonnois , Thank you for the response!

I have few other questions, can you please help me here?

  1. We have a use case where files need to be read from an S3 bucket using the FilePulse connector. If there are any issues reading a file with FilePulse, is there a configuration available to send individual failed records? If yes, could you provide the configuration details?
  2. When reading from S3 using the FilePulse connector, if there are multiple optional fields that do not need to be published to the topic, is there a configuration available to exclude those fields from being published?

@Teju2404
Copy link
Author

Hi @fhussonnois can you help me on my questions?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants