Skip to content

Commit

Permalink
node/metabase: Support ROOT and PHY search filters in new meta bucket (
Browse files Browse the repository at this point in the history
  • Loading branch information
cthulhu-rider authored Feb 13, 2025
2 parents eb90734 + 650b801 commit ef63ec1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 19 deletions.
38 changes: 29 additions & 9 deletions pkg/local_object_storage/metabase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
attrIDFixedLen = 1 + oid.Size + utf8DelimiterLen // prefix first
)

const binPropMarker = "1" // ROOT, PHY, etc.

var (
maxUint256 = new(big.Int).SetBytes([]byte{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255})
Expand All @@ -45,10 +47,10 @@ func invalidMetaBucketKeyErr(key []byte, cause error) error {
}

// TODO: fill on migration.
// TODO: ROOT and PHY props.
// TODO: cleaning on obj removal.
func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner user.ID, typ object.Type, creationEpoch uint64,
payloadLen uint64, pldHash, pldHmmHash, splitID []byte, parentID, firstID oid.ID, attrs []object.Attribute) error {
payloadLen uint64, pldHash, pldHmmHash, splitID []byte, parentID, firstID oid.ID, attrs []object.Attribute,
root, phy bool) error {
metaBkt, err := tx.CreateBucketIfNotExists(metaBucketKey(cnr))
if err != nil {
return fmt.Errorf("create meta bucket for container: %w", err)
Expand Down Expand Up @@ -96,6 +98,16 @@ func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner
return err
}
}
if root {
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterRoot, binPropMarker); err != nil {
return err
}
}
if phy {
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterPhysical, binPropMarker); err != nil {
return err
}
}
for i := range attrs {
ak, av := attrs[i].Key(), attrs[i].Value()
if n, isInt := parseInt(av); isInt && n.Cmp(maxUint256Neg) >= 0 && n.Cmp(maxUint256) <= 0 {
Expand Down Expand Up @@ -195,7 +207,7 @@ func (db *DB) search(cnr cid.ID, fs object.SearchFilters, attrs []string, cursor
func (db *DB) searchInBucket(metaBkt *bbolt.Bucket, fs object.SearchFilters, attrs []string,
cursor *SearchCursor, count uint16) ([]client.SearchResultItem, *SearchCursor, error) {
// TODO: make as much as possible outside the Bolt tx
primMatcher := fs[0].Operation()
primMatcher, primVal := convertFilterValue(fs[0])
intPrimMatcher := isNumericOp(primMatcher)
notPresentPrimMatcher := primMatcher == object.MatchNotPresent
primAttr := fs[0].Header() // attribute emptiness already prevented
Expand Down Expand Up @@ -223,7 +235,7 @@ func (db *DB) searchInBucket(metaBkt *bbolt.Bucket, fs object.SearchFilters, att
if primMatcher == object.MatchStringEqual || primMatcher == object.MatchCommonPrefix ||
primMatcher == object.MatchNumGT || primMatcher == object.MatchNumGE {
var err error
if primSeekKey, primSeekPrefix, err = seekKeyForAttribute(primAttr, fs[0].Value()); err != nil {
if primSeekKey, primSeekPrefix, err = seekKeyForAttribute(primAttr, primVal); err != nil {
return nil, nil, fmt.Errorf("invalid primary filter value: %w", err)
}
} else {
Expand Down Expand Up @@ -279,11 +291,12 @@ nextPrimKey:
if i > 0 && attr != primAttr {
continue
}
checkedDBVal, fltVal, err := combineValues(attr, dbVal, fs[i].Value()) // TODO: deduplicate DB value preparation
mch, val := convertFilterValue(fs[i])
checkedDBVal, fltVal, err := combineValues(attr, dbVal, val) // TODO: deduplicate DB value preparation
if err != nil {
return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid attribute %s value: %w", attr, err)
}
if !matchValues(checkedDBVal, fs[i].Operation(), fltVal) {
if !matchValues(checkedDBVal, mch, fltVal) {
continue nextPrimKey
}
// TODO: attribute value can be requested, it can be collected here, or we can
Expand Down Expand Up @@ -311,7 +324,7 @@ nextPrimKey:
if j > 0 && fs[j].Header() != attr {
continue
}
m := fs[j].Operation()
m, val := convertFilterValue(fs[j])
if dbVal == nil {
if m == object.MatchNotPresent {
continue
Expand Down Expand Up @@ -347,7 +360,7 @@ nextPrimKey:
} else {
checkedDBVal = dbVal
}
checkedDBVal, fltVal, err := combineValues(attr, checkedDBVal, fs[j].Value()) // TODO: deduplicate DB value preparation
checkedDBVal, fltVal, err := combineValues(attr, checkedDBVal, val) // TODO: deduplicate DB value preparation
if err != nil {
return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid attribute %s value: %w", attr, err))
}
Expand Down Expand Up @@ -507,7 +520,7 @@ func seekKeyForAttribute(attr, fltVal string) ([]byte, []byte, error) {
return nil, nil, fmt.Errorf("decode %q UUID attribute: %w", attr, err)
}
dbVal = uid[:]
case object.FilterVersion, object.FilterType:
case object.FilterVersion, object.FilterType, object.FilterRoot, object.FilterPhysical:
}
key := make([]byte, 1+len(attr)+utf8DelimiterLen+len(dbVal)) // prefix 1st
key[0] = metaPrefixAttrIDPlain
Expand Down Expand Up @@ -794,3 +807,10 @@ func (x *metaAttributeSeeker) restoreVal(id []byte, attr string, stored []byte)
}
return string(stored), nil
}

func convertFilterValue(f object.SearchFilter) (object.SearchMatchType, string) {
if attr := f.Header(); attr == object.FilterRoot || attr == object.FilterPhysical {
return object.MatchStringEqual, binPropMarker
}
return f.Operation(), f.Value()
}
16 changes: 8 additions & 8 deletions pkg/local_object_storage/metabase/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func TestPutMetadata(t *testing.T) {
0, 0, 0, 0, 101, 118, 30, 154, 145, 227, 159, 231})
assertIntAttr(t, mb, id, "$Object:payloadLength", []byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 29, 7, 76, 78, 96, 175, 200, 130})
assertAttr(t, mb, id, "$Object:ROOT", "1")
assertAttr(t, mb, id, "$Object:PHY", "1")
assertAttr(t, mb, id, "attr_1", "val_1")
assertAttr(t, mb, id, "attr_2", "val_2")
assertAttr(t, mb, id, "num_negative_overflow", "-115792089237316195423570985008687907853269984665640564039457584007913129639936")
Expand Down Expand Up @@ -670,24 +672,22 @@ func TestDB_SearchObjects(t *testing.T) {
}
})
t.Run("ROOT", func(t *testing.T) {
t.Skip("not supported yet")
check("$Object:ROOT", 0, "", []uint{0, 1})
for _, matcher := range []object.SearchMatchType{
object.MatchStringEqual, object.MatchStringNotEqual, object.MatchNotPresent, object.MatchCommonPrefix,
object.MatchUnspecified, object.MatchStringEqual, object.MatchStringNotEqual,
object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE,
} {
check("$Object:ROOT", matcher, "", nil)
check("$Object:ROOT", matcher, "", []uint{0, 1})
}
check("$Object:ROOT", object.MatchNotPresent, "", nil)
})
t.Run("PHY", func(t *testing.T) {
t.Skip("not supported yet")
check("$Object;PHY", 0, "", []uint{0, 1, 2, 3})
for _, matcher := range []object.SearchMatchType{
object.MatchStringEqual, object.MatchStringNotEqual, object.MatchNotPresent, object.MatchCommonPrefix,
object.MatchUnspecified, object.MatchStringEqual, object.MatchStringNotEqual,
object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE,
} {
check("$Object:PHY", matcher, "", nil)
check("$Object:PHY", matcher, "", []uint{2, 3, 4, 5})
}
check("$Object:PHY", object.MatchNotPresent, "", nil)
})
t.Run("version", func(t *testing.T) {
check := func(m object.SearchMatchType, v string, matchInds []uint) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/local_object_storage/metabase/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (db *DB) put(
return nil
}

if par := obj.Parent(); par != nil && !isParent { // limit depth by two
par := obj.Parent()
if par != nil && !isParent { // limit depth by two
if parID := par.GetID(); !parID.IsZero() { // skip the first object without useful info
parentSI, err := splitInfoFromObject(obj)
if err != nil {
Expand Down Expand Up @@ -171,7 +172,7 @@ func (db *DB) put(
pldHmmHash = h.Value()
}
if err := putMetadata(tx, cnr, obj.GetID(), ver, *owner, obj.Type(), obj.CreationEpoch(), obj.PayloadSize(),
pldHash.Value(), pldHmmHash, obj.SplitID().ToV2(), obj.GetParentID(), obj.GetFirstID(), obj.Attributes()); err != nil {
pldHash.Value(), pldHmmHash, obj.SplitID().ToV2(), obj.GetParentID(), obj.GetFirstID(), obj.Attributes(), par == nil, !isParent); err != nil {
return fmt.Errorf("put metadata: %w", err)
}

Expand Down

0 comments on commit ef63ec1

Please sign in to comment.