Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc][duckdb] API change proposal to support DVRT checkpointing #1476

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

FelixGV
Copy link
Contributor

@FelixGV FelixGV commented Jan 27, 2025

This PR simply adds a getStorageMetadataService API to DVRT, with a default implementation returning null. If the SIT gets null from DVRT, then nothing changes compared to today. If it gets a non-null instance, then read/write operations for OffsetRecord and StoreVersionState will go through the DVRT SMS instance, rather than the regular one.

This will allow DVRT to implement checkpointing. In the case of DuckDB, it means that the DuckDB file will be able to contain the checkpointing, and the RocksDB metadata partition will be empty.

How was this PR tested?

N/A

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

This PR simply adds a getStorageMetadataService API to DVRT, with a default
implementation returning null. If the SIT gets null from DVRT, then nothing
changes compared to today. If it gets a non-null instance, then read/write
operations for OffsetRecord and StoreVersionState will go through the DVRT
SMS instance, rather than the regular one.

This will allow DVRT to implement checkpointing. In the case of DuckDB, it
means that the DuckDB file will be able to contain the checkpointing, and
the RocksDB metadata partition will be empty.
- Also bumped duckdb dep to 1.2.0-20250127.011437-139
…tions

  coming from AbstractStorageEngine. This new interface is what the
  StorageEngineMetadataService now uses internally, which should make it
  easier to implement the bare minimum needed for alternative checkpoint
  storage mechanisms.

- Removed Optional from the getPartitionOffset API's return type.
}

@Override
public StorageMetadataService getStorageMetadataService() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So thinking aloud, StorageMetadataSErvice extends OffsetManager. I'm wondering if we can get away with that smaller interface? Though I guess looking at both interfaces, they operate on string imputs to topicName, which would imply we'd be leaking the kafka topic name construction to the user.

Maybe we need something more user friendly? I'm getting flashbacks to when we first tried to expose the storageEngine interface to users and realized it was maybe a bit too complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with this PR overall I'm concerned we're leaking too much. I wonder if we can away with smaller interfaces with concepts the users understands like store name and version exclusively.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we add a byte[] interface that can be used directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, these are great comments. Though not necessarily straightforward to answer :D ... let me try...

First of all, I do agree that we should aim to make the abstraction as simple as possible. The fewer APIs the better, and likewise, the simpler each API is the better. It is with that intent that I've created the CheckpointStorageEngine interface, which contains a subset of the AbstractStorageEngine APIs, and that I've made the StorageMetadataService use that instead of AbstractStorageEngine. But it is good to ponder whether the CheckpointStorageEngine is as simple as possible.

In terms of simplifying further, two points are brought up, topicName and using byte[] instead of strongly-typed objects.

Regarding topicName, I think we could refactor things such that we eliminate it altogether... in the sense that a given SIT instance should always pass the same topicName in those params, so why bother passing it at all? That would be a simplification in one sense, although I worry that the implementer of a custom SMS may fail to realize that it ought to support keeping state for multiple topics at once. It would be a bug to let the checkpointing state of one store-version clobber that of another. Having it in the API makes that more explicit, and thus probably safer. Even if we do keep it, we could replace it with a stronger type than String, e.g. PubSubTopic or StoreVersionName. I think I lean towards the latter since it is more constrained (only version-topics should be passed in this param, but a PubSubTopic can be any kind of topic, including RT topics, etc.). I can take a stab at making that change.

Regarding byte[] versus OffsetRecord and StoreVersionState, I did think about it... ultimately, we do serialize those objects into bytes. And for the immediate use case I have in mind (DuckDB-based checkpointing) I would also use bytes (stored in a VARBINARY column) to keep things simple. That being said, it is conceivable to build this thing differently. I could choose to map these data structures into full-blown tables where each field has its own dedicated column of the right type (like we do for the user's payloads). There is no need to do that, per say, but it could be interesting in terms of debuggability to be able to introspect the checkpointing state. If the API is bytes, then it is more awkward to do this (still possible of course, anything is possible...) whereas if I pass the strongly typed objects, I retain the flexibility of either serializing them to bytes or using all parts of their state. Of course, there could also be drawbacks to enabling this level of flexibility (e.g. the custom SMS implementer would need to deal with such things as schema evolution, whereas with opaque bytes we leave that concern at a higher level, within the core of the Venice code, while the SMS implementer can ignore it...). I am not yet leaning one way or the other on this one... do you have any thoughts?

Finally, there is one more thing which is to use just OffsetManager or StorageMetadataService (which extends it). The OM provides read/write/delete APIs for OffsetRecord, which is a per-partition kind of state, while the SMS provides that + read/write/delete APIs for StoreVersionState, which as the name implies is per-store-version. Currently we use both of these granularities of state, which I recognize is a bit inconvenient, though I'm not sure what would be a clean path forward here... If we did the refactoring of combining these two states into one, then we would duplicate the SVS x the number of partitions we have. Notably, this includes bulky stuff like the compression dictionary, which I'm not thrilled about storing hundreds to thousands of times (in the case of stores with high partition count). I also worry that it may fragilize the SIT since right now there is an assumption that the SVS is guaranteed to be consistent across the whole store-version, but if we start handling it on a per-partition level, we might introduce bugs where the SVS part of the newly augmented partition state is allowed to diverge between partitions (these bugs can be guarded against, but it just seems more error-prone).

Anyway, sorry for the long post. I think these are important design considerations and I hope we can make thoughtful decisions about each aspect. Please LMK what you think.

Copy link
Contributor

@ZacAttack ZacAttack Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only version-topics should be passed in this param, but a PubSubTopic can be any kind of topic, including RT topics, etc.

slight maybe pendantic tweak on this it should also support view topics (which are a version-ed topic). Or maybe the interface understands store name and version and weather or not that translates to a view topic variant is something backed by config and a different kind of logic.

I'm still kind of mentally wrestling with this a bit, so my next notes might be stream of consciousness, apologies.

One 'ick' I think I'm having is exposing StoreVersionState to users directly. It contains a LOT of stuff and can be open to manipulation. I think though I get the way 'why' we'd like to have it. It's essentially our SIT checkpoint. (or at least, it's everything we need in order for ingestion to continue properly from some point). So I guess ultimately to feel better about it, we have the whole thing somehow obscured from the user enough to prevent them from trying to edit it or implement code which is based off of details we'd rather not be made public, but still portable enough for them to get it, store it, and give it back. CheckpointStorageEngine is the means to that end. Could we make maybe wrapper/accessor classes that are public and then take storeversionstate and storemetadataservice protected?

....I might have to pull this pr down and play around with it to give more coherent/helpful feedback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern that it could be manipulated is valid... these are "live objects" and they are mutable. We definitely would not want a custom checkpointer to mutate them. This would favor Nisarg's suggestion of serializing them to an opaque bytes blob and passing that instead. A very motivated implementer could still deserialize the opaque blob and peer into it, but if they tampered with the bytes it wouldn't mess with our operational state.

Regarding view topics, I had forgotten about that, so I guess it ought to be a PubSubTopic object then.

Copy link
Contributor

@xunyin8 xunyin8 Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's relevant but the current direction for DVC views implementation (#1466) is to leverage these adapters to decorate things as needed in centralized places (e.g. the store repository) so we don't need to have specialized logic to deal with it everywhere in order to understand and materialize a view store version. Similarly here I think passing view topics as PubSubTopic will work if we apply similar strategies. A string on the other hand will require implementer to be aware and understand view topics.

@FelixGV
Copy link
Contributor Author

FelixGV commented Feb 5, 2025

After gathering feedback on this proposal and thinking it through some more, I am thinking of changing the strategy a bit.

In particular, needing the checkpoint hook to support get/set/delete operations on both the PartitionState (wrapped inside of an OffsetRecord) as well as the StoreVersionState seems too cumbersome. First of all, it means implementing 6 APIs rather than 3, but also, in some contexts, the topic-level state is cumbersome to handle. For example, if we implemented a version of the checkpoint API which stores the metadata in Kafka's consumer group offset checkpoints (which does support attaching an extra blob), then there would not be an obvious place where to put the SVS (there are always workarounds, but they're not elegant...).

Considering the above, it seems ideal to extend the PS so that it contains some of the state currently located in the SVS, such that DVC could get everything it needs from just the PS, and somehow regenerate the SVS on-demand if needed. In order to achieve this, some the SVS fields can be copied to the PS, while others can be omitted entirely (e.g. the TopicSwitch field should be used only in the server code paths, and not in DVC, so can be omitted). Finally, in the case of the compression dictionary, it would be preferable not to copy it into the PS because it can be voluminous, which could be expensive for a high partition count store. Luckily, there is already a server endpoint which serves the dictionary (this is the one routers use), so it should not be too difficult to get it from there instead.

With this refactoring, the checkpointing API could be limited to just 3 APIs: get/set/delete the PS for a given partition. The partition can be modeled as a PubSubTopicPartition object, while the state itself can be a byte[], for the reasons discussed in the other thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants