Skip to content

Commit

Permalink
db: key fileCache by handle
Browse files Browse the repository at this point in the history
The `fileCache` is integrated with the block Cache and uses the
`cache.ID` to separate files from different stores.

We can achieve the same by keying on the `*fileCacheHandle` instead of
`cache.ID`, which simplifies things a bit. It would also allow using
a single file cache and multiple block caches if we ever need that
(perhaps for value separation).
  • Loading branch information
RaduBerinde committed Feb 5, 2025
1 parent da87cfc commit 179abb8
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 64 deletions.
58 changes: 20 additions & 38 deletions file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,9 @@ type fileCacheHandle struct {
// newHandle creates a handle for the FileCache which has its own options. Each
// handle has its own set of files in the cache, separate from those of other
// handles.
//
// Each handle's cacheID must be unique.
//
// newHandle will panic if the underlying block cache in the file cache doesn't
// match Options.Cache.
func (c *FileCache) newHandle(
cacheID cache.ID, objProvider objstorage.Provider, opts *Options,
) *fileCacheHandle {
// We will release a ref to the file cache acquired here when
// fileCacheHandle.close is called.
if c.cache != opts.Cache {
panic("pebble: underlying cache for the file cache and db are different")
}
c.Ref()

t := &fileCacheHandle{
Expand All @@ -134,9 +124,9 @@ func (c *FileCache) newHandle(
return t
}

// Before calling close, make sure that there will be no further need
// Close the handle, make sure that there will be no further need
// to access any of the files associated with the store.
func (h *fileCacheHandle) close() error {
func (h *fileCacheHandle) Close() error {
// We want to do some cleanup work here. Check for leaked iterators
// by the DB using this container. Note that we'll still perform cleanup
// below in the case that there are leaked iterators.
Expand All @@ -151,8 +141,7 @@ func (h *fileCacheHandle) close() error {
shard.removeDB(h)
}
}
err = firstError(err, c.fileCache.Unref())
err = firstError(err, c.fileCache.Unref())
err = firstError(err, h.fileCache.Unref())
// TODO(radu): we have to tolerate metrics() calls after close (see
// https://github.com/cockroachdb/cockroach/issues/140454).
// *h = fileCacheHandle{}
Expand Down Expand Up @@ -271,7 +260,6 @@ func (h *fileCacheHandle) IterCount() int64 {
type FileCache struct {
refs atomic.Int64

cache *Cache
shards []*fileCacheShard
}

Expand Down Expand Up @@ -302,9 +290,6 @@ func (c *FileCache) Unref() error {
err = firstError(err, c.shards[i].Close())
}

// Unref the cache which we create a reference to when the file cache is
// first instantiated.
c.cache.Unref()
return err
}
return nil
Expand All @@ -313,16 +298,14 @@ func (c *FileCache) Unref() error {
// NewFileCache will create a new file cache with one outstanding reference. It
// is the callers responsibility to call Unref if they will no longer hold a
// reference to the file cache.
func NewFileCache(cache *Cache, numShards int, size int) *FileCache {
func NewFileCache(numShards int, size int) *FileCache {
if size == 0 {
panic("pebble: cannot create a file cache of size 0")
} else if numShards == 0 {
panic("pebble: cannot create a file cache with 0 shards")
}

c := &FileCache{}
c.cache = cache
c.cache.Ref()

c.shards = make([]*fileCacheShard, numShards)
for i := range c.shards {
Expand All @@ -341,7 +324,7 @@ func (c *FileCache) getShard(fileNum base.DiskFileNum) *fileCacheShard {
}

type fileCacheKey struct {
cacheID cache.ID
handle *fileCacheHandle
fileNum base.DiskFileNum
}

Expand Down Expand Up @@ -767,7 +750,7 @@ func (c *fileCacheShard) releaseNode(n *fileCacheNode) {
//
// c.mu must be held when calling this.
func (c *fileCacheShard) unlinkNode(n *fileCacheNode) {
key := fileCacheKey{n.cacheID, n.fileNum}
key := fileCacheKey{n.handle, n.fileNum}
delete(c.mu.nodes, key)

switch n.ptype {
Expand Down Expand Up @@ -842,7 +825,7 @@ func (c *fileCacheShard) findNodeInternal(
) *fileCacheValue {
// Fast-path for a hit in the cache.
c.mu.RLock()
key := fileCacheKey{handle.cacheID, backingFileNum}
key := fileCacheKey{handle, backingFileNum}
if n := c.mu.nodes[key]; n != nil && n.value != nil {
// Fast-path hit.
//
Expand All @@ -866,8 +849,9 @@ func (c *fileCacheShard) findNodeInternal(
n = &fileCacheNode{
fileNum: backingFileNum,
ptype: fileCacheNodeCold,
handle: handle,
}
c.addNode(n, handle)
c.addNode(n)
c.mu.sizeCold++

case n.value != nil:
Expand All @@ -892,7 +876,8 @@ func (c *fileCacheShard) findNodeInternal(

n.referenced.Store(false)
n.ptype = fileCacheNodeHot
c.addNode(n, handle)
n.handle = handle
c.addNode(n)
c.mu.sizeHot++
}

Expand Down Expand Up @@ -926,10 +911,9 @@ func (c *fileCacheShard) findNodeInternal(
return v
}

func (c *fileCacheShard) addNode(n *fileCacheNode, handle *fileCacheHandle) {
func (c *fileCacheShard) addNode(n *fileCacheNode) {
c.evictNodes()
n.cacheID = handle.cacheID
key := fileCacheKey{n.cacheID, n.fileNum}
key := fileCacheKey{n.handle, n.fileNum}
c.mu.nodes[key] = n

n.links.next = n
Expand Down Expand Up @@ -1025,7 +1009,7 @@ func (c *fileCacheShard) runHandTest() {

func (c *fileCacheShard) evict(fileNum base.DiskFileNum, handle *fileCacheHandle, allowLeak bool) {
c.mu.Lock()
key := fileCacheKey{handle.cacheID, fileNum}
key := fileCacheKey{handle, fileNum}
n := c.mu.nodes[key]
var v *fileCacheValue
if n != nil {
Expand Down Expand Up @@ -1055,9 +1039,8 @@ func (c *fileCacheShard) evict(fileNum base.DiskFileNum, handle *fileCacheHandle
handle.cache.EvictFile(handle.cacheID, fileNum)
}

// removeDB evicts any nodes which have a reference to the DB
// associated with handle.cacheID. Make sure that there will
// be no more accesses to the files associated with the DB.
// removeDB evicts any nodes associated with handle. Make sure that there will
// be no more accesses to files associated with the handle.
func (c *fileCacheShard) removeDB(handle *fileCacheHandle) {
var fileNums []base.DiskFileNum

Expand All @@ -1070,7 +1053,7 @@ func (c *fileCacheShard) removeDB(handle *fileCacheHandle) {
firstNode = node
}

if node.cacheID == handle.cacheID {
if node.handle == handle {
fileNums = append(fileNums, node.fileNum)
}
node = node.next()
Expand Down Expand Up @@ -1194,7 +1177,7 @@ func (v *fileCacheValue) load(
defer c.mu.Unlock()
// Lookup the node in the cache again as it might have already been
// removed.
key := fileCacheKey{handle.cacheID, backingFileNum}
key := fileCacheKey{handle, backingFileNum}
n := c.mu.nodes[key]
if n != nil && n.value == v {
c.releaseNode(n)
Expand Down Expand Up @@ -1246,9 +1229,8 @@ type fileCacheNode struct {
// since the last time one of the clock hands swept it.
referenced atomic.Bool

// Storing the cache id associated with the DB instance here
// avoids the need to thread the handle struct through many functions.
cacheID cache.ID
// A fileCacheNode is associated with a specific fileCacheHandle.
handle *fileCacheHandle
}

func (n *fileCacheNode) next() *fileCacheNode {
Expand Down
4 changes: 2 additions & 2 deletions file_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func newFileCacheTest(
t *testing.T, blockCacheSize int64, fileCacheSize int, fileCacheNumShards int,
) *fileCacheTest {
blockCache := NewCache(blockCacheSize)
fileCache := NewFileCache(blockCache, fileCacheNumShards, fileCacheSize)
fileCache := NewFileCache(fileCacheNumShards, fileCacheSize)
return &fileCacheTest{
T: t,
blockCache: blockCache,
Expand Down Expand Up @@ -1044,7 +1044,7 @@ func TestFileCacheErrorBadMagicNumber(t *testing.T) {
opts.EnsureDefaults()
opts.Cache = NewCache(8 << 20) // 8 MB
defer opts.Cache.Unref()
opts.FileCache = NewFileCache(opts.Cache, opts.Experimental.FileCacheShards, fileCacheTestCacheSize)
opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheTestCacheSize)
defer opts.FileCache.Unref()
c := opts.FileCache.newHandle(opts.Cache.NewID(), objProvider, opts)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion open.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {

fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
if opts.FileCache == nil {
opts.FileCache = NewFileCache(opts.Cache, opts.Experimental.FileCacheShards, fileCacheSize)
opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
defer opts.FileCache.Unref()
}
d.fileCache = opts.FileCache.newHandle(d.cacheID, d.objProvider, d.opts)
Expand Down
2 changes: 1 addition & 1 deletion open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (

func TestOpenSharedFileCache(t *testing.T) {
c := cache.New(cacheDefaultSize)
tc := NewFileCache(c, 16, 100)
tc := NewFileCache(16, 100)
defer tc.Unref()
defer c.Unref()

Expand Down
3 changes: 0 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2027,9 +2027,6 @@ func (o *Options) Validate() error {
fmt.Fprintf(&buf, "FormatMajorVersion (%d) when CreateOnShared is set must be at least %d\n",
o.FormatMajorVersion, FormatMinForSharedObjects)
}
if o.FileCache != nil && o.Cache != o.FileCache.cache {
fmt.Fprintf(&buf, "underlying cache in the FileCache and the Cache dont match\n")
}
if len(o.KeySchemas) > 0 {
if o.KeySchema == "" {
fmt.Fprintf(&buf, "KeySchemas is set but KeySchema is not\n")
Expand Down
19 changes: 0 additions & 19 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package pebble

import (
"fmt"
"math/rand/v2"
"runtime"
"strings"
Expand Down Expand Up @@ -371,24 +370,6 @@ func TestOptionsValidate(t *testing.T) {
}
}

// This test isn't being done in TestOptionsValidate
// cause it doesn't support setting pointers.
func TestOptionsValidateCache(t *testing.T) {
var opts Options
opts.EnsureDefaults()
opts.Cache = NewCache(8 << 20)
defer opts.Cache.Unref()
opts.FileCache = NewFileCache(NewCache(8<<20), 10, 1)
defer opts.FileCache.cache.Unref()
defer opts.FileCache.Unref()

err := opts.Validate()
require.Error(t, err)
if fmt.Sprint(err) != "underlying cache in the FileCache and the Cache dont match" {
t.Errorf("Unexpected error message")
}
}

func TestKeyCategories(t *testing.T) {
kc := MakeUserKeyCategories(base.DefaultComparer.Compare, []UserKeyCategory{
{Name: "b", UpperBound: []byte("b")},
Expand Down

0 comments on commit 179abb8

Please sign in to comment.