fix the issue reported in issue #247#349
fix the issue reported in issue #247#349kendrick-ren wants to merge 2 commits intoscylladb:masterfrom
Conversation
d47d40e to
87b4563
Compare
dkropachev
left a comment
There was a problem hiding this comment.
The behavior change itself looks reasonable, but the new test does not actually verify it. Please tighten the coverage so the test fails on the pre-PR implementation and passes only when the S3 export path really skips savepoint management.
| // Perform the migration and verify no savepoint files are created (issue #247: | ||
| // S3 export sources use ParallelCollectionPartitions, not HadoopPartitions, | ||
| // so DynamoDbSavepointsManager must not be instantiated for this path). | ||
| val beforeMigration = System.currentTimeMillis() | ||
| successfullyPerformMigration(configFile) | ||
|
|
||
| val savepointsDir = Paths.get("docker/spark-master") | ||
| if (Files.exists(savepointsDir)) { | ||
| Using.resource(Files.list(savepointsDir)) { stream => | ||
| val newSavepoints = stream | ||
| .iterator() | ||
| .asScala | ||
| .filter(p => Files.isRegularFile(p)) | ||
| .filter(_.getFileName.toString.startsWith("savepoint_")) | ||
| .filter(p => Files.getLastModifiedTime(p).toMillis >= beforeMigration) | ||
| .toList | ||
| assert( | ||
| newSavepoints.isEmpty, | ||
| s"S3 export import should not create savepoint files, but found: $newSavepoints" |
There was a problem hiding this comment.
This assertion does not actually prove that DynamoDbSavepointsManager was skipped. Savepoints are only written on signal or after intervalSeconds elapses, and close() does not emit a final savepoint. With the current test config (intervalSeconds: 300), the pre-PR implementation would also usually leave no savepoint_* file here, so this test can still pass even if the savepoints manager is instantiated. Please make the test observe the behavior more directly.
There was a problem hiding this comment.
@dkropachev enhanced the test case, pls take a look.
87b4563 to
f58e321
Compare
This PR is to fix the issue reported in #247
The root cause is that DynamoDbSavepointsManager only handles the HadoopPartition, while migrateFromS3Export path creates the RDD using spark.parallelize() which generates ParallelCollectionPartition instead.
When save points manager tries to extract DynamoDBSplit segiment from the partitions, it fails processing such partition.
The PR introduces a parameter in underlying migrate() method on which type of partition it is, and invokes writers.DynamoDB.writeRDD for S3 ParallelCollectionPartition paths.
The PR doesn't have the save points support for S3 path, as it would require a different mechanism to keep tracking of the processed files and filter out these files on restart. We can have an enhancement to support this in future.