-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dvc][duckdb] API change proposal to support DVRT checkpointing #1476
Open
FelixGV
wants to merge
3
commits into
linkedin:main
Choose a base branch
from
FelixGV:dvrt_checkpointing_api
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
...nts/da-vinci-client/src/main/java/com/linkedin/davinci/store/CheckpointStorageEngine.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package com.linkedin.davinci.store; | ||
|
||
import com.linkedin.venice.kafka.protocol.state.StoreVersionState; | ||
import com.linkedin.venice.offsets.OffsetRecord; | ||
|
||
|
||
/** | ||
* Interface to read and write the metadata needed for checkpointing, including: | ||
* | ||
* - {@link com.linkedin.venice.offsets.OffsetRecord} | ||
* - {@link com.linkedin.venice.kafka.protocol.state.StoreVersionState} | ||
*/ | ||
public interface CheckpointStorageEngine { | ||
void putPartitionOffset(int partitionId, OffsetRecord offsetRecord); | ||
|
||
OffsetRecord getPartitionOffset(int partitionId); | ||
|
||
void clearPartitionOffset(int partitionId); | ||
|
||
void putStoreVersionState(StoreVersionState versionState); | ||
|
||
StoreVersionState getStoreVersionState(); | ||
|
||
void clearStoreVersionState(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So thinking aloud, StorageMetadataSErvice extends OffsetManager. I'm wondering if we can get away with that smaller interface? Though I guess looking at both interfaces, they operate on string imputs to topicName, which would imply we'd be leaking the kafka topic name construction to the user.
Maybe we need something more user friendly? I'm getting flashbacks to when we first tried to expose the storageEngine interface to users and realized it was maybe a bit too complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with this PR overall I'm concerned we're leaking too much. I wonder if we can away with smaller interfaces with concepts the users understands like store name and version exclusively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, can we add a
byte[]
interface that can be used directly?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, these are great comments. Though not necessarily straightforward to answer :D ... let me try...
First of all, I do agree that we should aim to make the abstraction as simple as possible. The fewer APIs the better, and likewise, the simpler each API is the better. It is with that intent that I've created the
CheckpointStorageEngine
interface, which contains a subset of theAbstractStorageEngine
APIs, and that I've made theStorageMetadataService
use that instead ofAbstractStorageEngine
. But it is good to ponder whether theCheckpointStorageEngine
is as simple as possible.In terms of simplifying further, two points are brought up,
topicName
and usingbyte[]
instead of strongly-typed objects.Regarding
topicName
, I think we could refactor things such that we eliminate it altogether... in the sense that a givenSIT
instance should always pass the sametopicName
in those params, so why bother passing it at all? That would be a simplification in one sense, although I worry that the implementer of a customSMS
may fail to realize that it ought to support keeping state for multiple topics at once. It would be a bug to let the checkpointing state of one store-version clobber that of another. Having it in the API makes that more explicit, and thus probably safer. Even if we do keep it, we could replace it with a stronger type thanString
, e.g.PubSubTopic
orStoreVersionName
. I think I lean towards the latter since it is more constrained (only version-topics should be passed in this param, but aPubSubTopic
can be any kind of topic, including RT topics, etc.). I can take a stab at making that change.Regarding
byte[]
versusOffsetRecord
andStoreVersionState
, I did think about it... ultimately, we do serialize those objects into bytes. And for the immediate use case I have in mind (DuckDB-based checkpointing) I would also use bytes (stored in aVARBINARY
column) to keep things simple. That being said, it is conceivable to build this thing differently. I could choose to map these data structures into full-blown tables where each field has its own dedicated column of the right type (like we do for the user's payloads). There is no need to do that, per say, but it could be interesting in terms of debuggability to be able to introspect the checkpointing state. If the API is bytes, then it is more awkward to do this (still possible of course, anything is possible...) whereas if I pass the strongly typed objects, I retain the flexibility of either serializing them to bytes or using all parts of their state. Of course, there could also be drawbacks to enabling this level of flexibility (e.g. the customSMS
implementer would need to deal with such things as schema evolution, whereas with opaque bytes we leave that concern at a higher level, within the core of the Venice code, while theSMS
implementer can ignore it...). I am not yet leaning one way or the other on this one... do you have any thoughts?Finally, there is one more thing which is to use just
OffsetManager
orStorageMetadataService
(which extends it). TheOM
provides read/write/delete APIs forOffsetRecord
, which is a per-partition kind of state, while theSMS
provides that + read/write/delete APIs forStoreVersionState
, which as the name implies is per-store-version. Currently we use both of these granularities of state, which I recognize is a bit inconvenient, though I'm not sure what would be a clean path forward here... If we did the refactoring of combining these two states into one, then we would duplicate theSVS
x the number of partitions we have. Notably, this includes bulky stuff like the compression dictionary, which I'm not thrilled about storing hundreds to thousands of times (in the case of stores with high partition count). I also worry that it may fragilize theSIT
since right now there is an assumption that theSVS
is guaranteed to be consistent across the whole store-version, but if we start handling it on a per-partition level, we might introduce bugs where theSVS
part of the newly augmented partition state is allowed to diverge between partitions (these bugs can be guarded against, but it just seems more error-prone).Anyway, sorry for the long post. I think these are important design considerations and I hope we can make thoughtful decisions about each aspect. Please LMK what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slight maybe pendantic tweak on this it should also support view topics (which are a version-ed topic). Or maybe the interface understands store name and version and weather or not that translates to a view topic variant is something backed by config and a different kind of logic.
I'm still kind of mentally wrestling with this a bit, so my next notes might be stream of consciousness, apologies.
One 'ick' I think I'm having is exposing StoreVersionState to users directly. It contains a LOT of stuff and can be open to manipulation. I think though I get the way 'why' we'd like to have it. It's essentially our SIT checkpoint. (or at least, it's everything we need in order for ingestion to continue properly from some point). So I guess ultimately to feel better about it, we have the whole thing somehow obscured from the user enough to prevent them from trying to edit it or implement code which is based off of details we'd rather not be made public, but still portable enough for them to get it, store it, and give it back. CheckpointStorageEngine is the means to that end. Could we make maybe wrapper/accessor classes that are public and then take storeversionstate and storemetadataservice protected?
....I might have to pull this pr down and play around with it to give more coherent/helpful feedback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concern that it could be manipulated is valid... these are "live objects" and they are mutable. We definitely would not want a custom checkpointer to mutate them. This would favor Nisarg's suggestion of serializing them to an opaque bytes blob and passing that instead. A very motivated implementer could still deserialize the opaque blob and peer into it, but if they tampered with the bytes it wouldn't mess with our operational state.
Regarding view topics, I had forgotten about that, so I guess it ought to be a
PubSubTopic
object then.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's relevant but the current direction for DVC views implementation (#1466) is to leverage these adapters to decorate things as needed in centralized places (e.g. the store repository) so we don't need to have specialized logic to deal with it everywhere in order to understand and materialize a view store version. Similarly here I think passing view topics as
PubSubTopic
will work if we apply similar strategies. Astring
on the other hand will require implementer to be aware and understand view topics.