diff --git a/index.go b/index.go index 3d2389884..6ab7ccd2a 100644 --- a/index.go +++ b/index.go @@ -329,6 +329,24 @@ func OpenUsing(path string, runtimeConfig map[string]interface{}) (Index, error) return openIndexUsing(path, runtimeConfig) } +// Update index at the specified path, must exist. +// The mapping used when created will be overwritten by the mapping provided +// for all Index/Search operations. +// Throws an error without any changes to the index if an unupdatable mapping is provided +func Update(path string, newParams string) (Index, error) { + return updateIndexUsing(path, nil, newParams) +} + +// UpdateUsing index at the specified path, must exist. +// The mapping used when created will be overwritten by the mapping provided +// for all Index/Search operations. +// The provided runtimeConfig can override settings +// persisted when the kvstore was created. +// Throws an error without any changes to the index if an unupdatable mapping is provided +func UpdateUsing(path string, runtimeConfig map[string]interface{}, newParams string) (Index, error) { + return updateIndexUsing(path, runtimeConfig, newParams) +} + // Builder is a limited interface, used to build indexes in an offline mode. // Items cannot be updated or deleted, and the caller MUST ensure a document is // indexed only once. diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 5d79298d8..85c4579ca 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -859,7 +859,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { segmentBucket := snapshot.Bucket(k) if segmentBucket == nil { _ = rv.DecRef() - return nil, fmt.Errorf("segment key, but bucket missing % x", k) + return nil, fmt.Errorf("segment key, but bucket missing %x", k) } segmentSnapshot, err := s.loadSegment(segmentBucket) if err != nil { diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 5925a19d7..e4703ecb1 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -37,7 +37,7 @@ const Version uint8 = 2 var ErrClosed = fmt.Errorf("scorch closed") -var mappingInternalKey = []byte("_mapping") +var MappingInternalKey = []byte("_mapping") type Scorch struct { nextSegmentID uint64 @@ -886,15 +886,27 @@ func (s *Scorch) FireIndexEvent() { s.fireEvent(EventKindIndexStart, 0) } +// Updates bolt db with the given field info. Existing field info already in bolt +// will be merged before persisting. The index mapping is also overwritted both +// in bolt as well as the index snapshot func (s *Scorch) UpdateFields(fieldInfo map[string]*index.FieldInfo, mappingBytes []byte) error { - err := s.updateBolt(fieldInfo, mappingBytes) + // Switch from pointer to value to marshal into a json for storage + updatedFields := make(map[string]index.FieldInfo) + for field, info := range fieldInfo { + updatedFields[field] = *info + } + err := s.updateBolt(updatedFields, mappingBytes) if err != nil { return err } + s.root.m.Lock() + s.root.updatedFields = updatedFields + s.root.m.Unlock() return nil } -func (s *Scorch) updateBolt(fieldInfo map[string]*index.FieldInfo, mappingBytes []byte) error { +// Merge and update deleted field info and rewrite index mapping +func (s *Scorch) updateBolt(fieldInfo map[string]index.FieldInfo, mappingBytes []byte) error { return s.rootBolt.Update(func(tx *bolt.Tx) error { snapshots := tx.Bucket(boltSnapshotsBucket) if snapshots == nil { @@ -910,20 +922,20 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.FieldInfo, mappingBytes } snapshot := snapshots.Bucket(k) cc := snapshot.Cursor() - for kk, _ := cc.First(); kk != nil; kk, _ = c.Next() { - if k[0] == boltInternalKey[0] { - internalBucket := snapshot.Bucket(k) + for kk, _ := cc.First(); kk != nil; kk, _ = cc.Next() { + if kk[0] == boltInternalKey[0] { + internalBucket := snapshot.Bucket(kk) if internalBucket == nil { - return fmt.Errorf("segment key, but bucket missing % x", k) + return fmt.Errorf("segment key, but bucket missing %x", kk) } - err = internalBucket.Put(mappingInternalKey, mappingBytes) + err = internalBucket.Put(MappingInternalKey, mappingBytes) if err != nil { return err } - } else if k[0] != boltMetaDataKey[0] { - segmentBucket := snapshot.Bucket(k) + } else if kk[0] != boltMetaDataKey[0] { + segmentBucket := snapshot.Bucket(kk) if segmentBucket == nil { - return fmt.Errorf("segment key, but bucket missing % x", k) + return fmt.Errorf("segment key, but bucket missing %x", kk) } var updatedFields map[string]index.FieldInfo updatedFieldBytes := segmentBucket.Get(boltUpdatedFieldsKey) @@ -932,11 +944,11 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.FieldInfo, mappingBytes if err != nil { return fmt.Errorf("error reading updated field bytes: %v", err) } + for field, info := range fieldInfo { + updatedFields[field] = info + } } else { - updatedFields = make(map[string]index.FieldInfo) - } - for field, info := range fieldInfo { - updatedFields[field] = *info + updatedFields = fieldInfo } b, err := json.Marshal(updatedFields) if err != nil { @@ -949,7 +961,6 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.FieldInfo, mappingBytes } } } - return nil }) } diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 19d24dc83..65232f9c2 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -466,10 +466,12 @@ func (is *IndexSnapshot) Document(id string) (rv index.Document, err error) { // Keeping that TODO for now until we have a cleaner way. rvd.StoredFieldsSize += uint64(len(val)) + // Skip fields that are supposed to have deleted store values if info, ok := is.updatedFields[name]; ok && (info.All || info.Store) { return true } + // copy value, array positions to preserve them beyond the scope of this callback value := append([]byte(nil), val...) arrayPos := append([]uint64(nil), pos...) @@ -612,6 +614,8 @@ func (is *IndexSnapshot) TermFieldReader(ctx context.Context, term []byte, field var dict segment.TermDictionary var err error + + // Skip fields that are supposed to have no indexing if info, ok := is.updatedFields[field]; ok && (info.Index || info.All) { dict = nil @@ -621,6 +625,7 @@ func (is *IndexSnapshot) TermFieldReader(ctx context.Context, term []byte, field if err != nil { return nil, err } + if dictStats, ok := dict.(segment.DiskStatsReporter); ok { bytesRead := dictStats.BytesRead() rv.incrementBytesRead(bytesRead) @@ -765,6 +770,7 @@ func (is *IndexSnapshot) documentVisitFieldTermsOnSegment( } } + // Filter out fields that are supposed to have no doc values var filteredFields []string for _, field := range vFields { if info, ok := is.updatedFields[field]; ok && @@ -797,8 +803,8 @@ func (is *IndexSnapshot) documentVisitFieldTermsOnSegment( } } - if ssvOk && ssv != nil && len(vFields) > 0 { - dvs, err = ssv.VisitDocValues(localDocNum, fields, visitor, dvs) + if ssvOk && ssv != nil && len(filteredFields) > 0 { + dvs, err = ssv.VisitDocValues(localDocNum, filteredFields, visitor, dvs) if err != nil { return nil, nil, err } diff --git a/index_impl.go b/index_impl.go index b9cf48452..69ad3fbbf 100644 --- a/index_impl.go +++ b/index_impl.go @@ -57,8 +57,6 @@ type indexImpl struct { const storePath = "store" -var mappingInternalKey = []byte("_mapping") - const SearchQueryStartCallbackKey = "_search_query_start_callback_key" const SearchQueryEndCallbackKey = "_search_query_end_callback_key" @@ -129,7 +127,7 @@ func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, if err != nil { return nil, err } - err = rv.i.SetInternal(mappingInternalKey, mappingBytes) + err = rv.i.SetInternal(scorch.MappingInternalKey, mappingBytes) if err != nil { return nil, err } @@ -164,25 +162,110 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde storeConfig = map[string]interface{}{} } + storeConfig["path"] = indexStorePath(path) + storeConfig["create_if_missing"] = false + storeConfig["error_if_exists"] = false + for rck, rcv := range runtimeConfig { + storeConfig[rck] = rcv + } + + // open the index + indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType) + if indexTypeConstructor == nil { + return nil, ErrorUnknownIndexType + } + + rv.i, err = indexTypeConstructor(rv.meta.Storage, storeConfig, Config.analysisQueue) + if err != nil { + return nil, err + } + err = rv.i.Open() + if err != nil { + return nil, err + } + defer func(rv *indexImpl) { + if !rv.open { + rv.i.Close() + } + }(rv) + + // now load the mapping + indexReader, err := rv.i.Reader() + if err != nil { + return nil, err + } + defer func() { + if cerr := indexReader.Close(); cerr != nil && err == nil { + err = cerr + } + }() + + mappingBytes, err := indexReader.GetInternal(scorch.MappingInternalKey) + if err != nil { + return nil, err + } + + var im *mapping.IndexMappingImpl + err = util.UnmarshalJSON(mappingBytes, &im) + if err != nil { + return nil, fmt.Errorf("error parsing mapping JSON: %v\nmapping contents:\n%s", err, string(mappingBytes)) + } + + // mark the index as open + rv.mutex.Lock() + defer rv.mutex.Unlock() + rv.open = true + + // validate the mapping + err = im.Validate() + if err != nil { + // note even if the mapping is invalid + // we still return an open usable index + return rv, err + } + + rv.m = im + indexStats.Register(rv) + return rv, err +} + +func updateIndexUsing(path string, runtimeConfig map[string]interface{}, newParams string) (rv *indexImpl, err error) { + rv = &indexImpl{ + path: path, + name: path, + } + rv.stats = &IndexStat{i: rv} + + rv.meta, err = openIndexMeta(path) + if err != nil { + return nil, err + } + + // backwards compatibility if index type is missing + if rv.meta.IndexType == "" { + rv.meta.IndexType = upsidedown.Name + } + + storeConfig := rv.meta.Config + if storeConfig == nil { + storeConfig = map[string]interface{}{} + } + var um *mapping.IndexMappingImpl - var umBytes []byte + + if len(newParams) == 0 { + return nil, fmt.Errorf(("updated mapping is empty")) + } + + err = util.UnmarshalJSON([]byte(newParams), &um) + if err != nil { + return nil, fmt.Errorf("error parsing updated mapping JSON: %v\nmapping contents:\n%s", err, newParams) + } storeConfig["path"] = indexStorePath(path) storeConfig["create_if_missing"] = false storeConfig["error_if_exists"] = false for rck, rcv := range runtimeConfig { - if rck == "mapping" { - if val, ok := rcv.([]byte); ok { - err = util.UnmarshalJSON(val, &um) - if err != nil { - return nil, fmt.Errorf("error parsing updated mapping JSON: %v\nmapping contents:\n%s", err, val) - } - umBytes = val - } else { - return nil, fmt.Errorf("error typecasting updated mapping JSON\nmapping contents: %v", rcv) - } - continue - } storeConfig[rck] = rcv } @@ -196,6 +279,7 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde if err != nil { return nil, err } + err = rv.i.Open() if err != nil { return nil, err @@ -217,7 +301,7 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde } }() - mappingBytes, err := indexReader.GetInternal(mappingInternalKey) + mappingBytes, err := indexReader.GetInternal(scorch.MappingInternalKey) if err != nil { return nil, err } @@ -241,6 +325,7 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde return rv, err } + // Validate and update the index with the new mapping if um != nil { ui, ok := rv.i.(index.UpdateIndex) if !ok { @@ -252,15 +337,16 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde return rv, err } - fieldInfo, err := deletedFields(im, um) + fieldInfo, err := DeletedFields(im, um) if err != nil { return rv, err } - err = ui.UpdateFields(fieldInfo, umBytes) + err = ui.UpdateFields(fieldInfo, []byte(newParams)) if err != nil { return rv, err } + im = um } rv.m = im diff --git a/index_update.go b/index_update.go index 5e4dbcd52..c7228f532 100644 --- a/index_update.go +++ b/index_update.go @@ -1,4 +1,4 @@ -// Copyright (c) 2024 Couchbase, Inc. +// Copyright (c) 2025 Couchbase, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,6 +21,8 @@ import ( index "github.com/blevesearch/bleve_index_api" ) +// Store all the fields that interact with the data +// from a document path type pathInfo struct { fieldMapInfo []*fieldMapInfo dynamic bool @@ -28,15 +30,20 @@ type pathInfo struct { parentPath string } +// Store the field information with respect to the +// document paths type fieldMapInfo struct { fieldMapping *mapping.FieldMapping rootName string parent *pathInfo } -func deletedFields(ori, upd *mapping.IndexMappingImpl) (map[string]*index.FieldInfo, error) { - +// Compare two index mappings to identify all of the updatable changes +func DeletedFields(ori, upd *mapping.IndexMappingImpl) (map[string]*index.FieldInfo, error) { var err error + + // Check for new mappings present in the type mappings + // of the updated compared to the original for name, updDMapping := range upd.TypeMapping { err = checkUpdatedMapping(ori.TypeMapping[name], updDMapping) if err != nil { @@ -44,6 +51,8 @@ func deletedFields(ori, upd *mapping.IndexMappingImpl) (map[string]*index.FieldI } } + // Check for new mappings present in the default mappings + // of the updated compared to the original err = checkUpdatedMapping(ori.DefaultMapping, upd.DefaultMapping) if err != nil { return nil, err @@ -52,16 +61,23 @@ func deletedFields(ori, upd *mapping.IndexMappingImpl) (map[string]*index.FieldI oriPaths := make(map[string]*pathInfo) updPaths := make(map[string]*pathInfo) + // Go through each mapping present in the original + // and consolidate according to the document paths for name, oriDMapping := range ori.TypeMapping { addPathInfo(oriPaths, "", oriDMapping, ori, nil, name) } addPathInfo(oriPaths, "", ori.DefaultMapping, ori, nil, "") + // Go through each mapping present in the updated + // and consolidate according to the document paths for name, updDMapping := range upd.TypeMapping { addPathInfo(updPaths, "", updDMapping, ori, nil, name) } addPathInfo(updPaths, "", upd.DefaultMapping, ori, nil, "") + // Compare both the mappings based on the document paths + // and create a list of index, docvalues, store differences + // for every single field possible fieldInfo := make(map[string]*index.FieldInfo) for path, info := range oriPaths { err = addFieldInfo(fieldInfo, info, updPaths[path]) @@ -70,6 +86,8 @@ func deletedFields(ori, upd *mapping.IndexMappingImpl) (map[string]*index.FieldI } } + // Remove entries from the list with no changes between the + // original and the updated mapping for name, info := range fieldInfo { if !info.All && !info.Index && !info.DocValues && !info.Store { delete(fieldInfo, name) @@ -78,10 +96,12 @@ func deletedFields(ori, upd *mapping.IndexMappingImpl) (map[string]*index.FieldI return fieldInfo, nil } -// Function to ensure updated document mapping does not contain new field mappings -// or document mappings +// Ensures updated document mapping does not contain new +// field mappings or document mappings func checkUpdatedMapping(ori, upd *mapping.DocumentMapping) error { + // Check to verify both original and updated are not nil + // and are enabled before proceeding if ori == nil { if upd == nil || !upd.Enabled { return nil @@ -94,6 +114,7 @@ func checkUpdatedMapping(ori, upd *mapping.DocumentMapping) error { } var err error + // Recursively go through the child mappings for name, updDMapping := range upd.Properties { err = checkUpdatedMapping(ori.Properties[name], updDMapping) if err != nil { @@ -101,6 +122,8 @@ func checkUpdatedMapping(ori, upd *mapping.DocumentMapping) error { } } + // Simple checks to ensure no new field mappings present + // in updated for _, updFMapping := range upd.Fields { var oriFMapping *mapping.FieldMapping @@ -117,13 +140,20 @@ func checkUpdatedMapping(ori, upd *mapping.DocumentMapping) error { return nil } +// Adds all of the field mappings while maintaining a tree of the document structure +// to ensure traversal and verification is possible incase of multiple mappings defined +// for a single field or multiple document fields' data getting written to a single zapx field func addPathInfo(paths map[string]*pathInfo, name string, mp *mapping.DocumentMapping, im *mapping.IndexMappingImpl, parent *pathInfo, rootName string) { + // Early exit if mapping has been disabled + // Comparisions later on will be done with a nil object if !mp.Enabled { return } + // Consolidate path information like index dynamic across multiple + // mappings if path is the same var pInfo *pathInfo if val, ok := paths[name]; ok { pInfo = val @@ -140,6 +170,7 @@ func addPathInfo(paths map[string]*pathInfo, name string, mp *mapping.DocumentMa pInfo.parentPath = parent.path } + // Recursively add path information for all child mappings for cName, cMapping := range mp.Properties { var pathName string if name == "" { @@ -150,6 +181,7 @@ func addPathInfo(paths map[string]*pathInfo, name string, mp *mapping.DocumentMa addPathInfo(paths, pathName, cMapping, im, pInfo, rootName) } + // Add field mapping information keeping the document structure intact for _, fMap := range mp.Fields { fieldMapInfo := &fieldMapInfo{ fieldMapping: fMap, @@ -162,12 +194,15 @@ func addPathInfo(paths map[string]*pathInfo, name string, mp *mapping.DocumentMa paths[name] = pInfo } +// Compare all of the fields at a particular document path and add its field information func addFieldInfo(fInfo map[string]*index.FieldInfo, ori, upd *pathInfo) error { var info *index.FieldInfo var updated bool var err error + // Assume deleted or disabled mapping if upd is nil. Checks for ori being nil + // or upd having mappings not in orihave already been done before this stage if upd == nil { for _, oriFMapInfo := range ori.fieldMapInfo { info, updated, err = compareFieldMapping(oriFMapInfo.fieldMapping, nil) @@ -182,6 +217,8 @@ func addFieldInfo(fInfo map[string]*index.FieldInfo, ori, upd *pathInfo) error { } else { for _, oriFMapInfo := range ori.fieldMapInfo { var updFMap *mapping.FieldMapping + // For multiple fields at a single document path, compare + // only with the matching ones for _, updFMapInfo := range upd.fieldMapInfo { if oriFMapInfo.rootName == updFMapInfo.rootName && oriFMapInfo.fieldMapping.Name == updFMapInfo.fieldMapping.Name { @@ -206,31 +243,13 @@ func addFieldInfo(fInfo map[string]*index.FieldInfo, ori, upd *pathInfo) error { return nil } -func validateFieldInfo(newInfo *index.FieldInfo, updated bool, fInfo map[string]*index.FieldInfo, - ori *pathInfo) error { - - var name string - if ori.fieldMapInfo[0].parent.parentPath == "" { - name = ori.fieldMapInfo[0].fieldMapping.Name - } else { - name = ori.fieldMapInfo[0].parent.parentPath + "." + ori.fieldMapInfo[0].fieldMapping.Name - } - if updated { - if ori.dynamic { - return fmt.Errorf("updated field is under a dynamic property") - } - } - if oldInfo, ok := fInfo[name]; ok { - if oldInfo.All != newInfo.All || oldInfo.Index != newInfo.Index || - oldInfo.DocValues != newInfo.DocValues || oldInfo.Store != newInfo.Store { - return fmt.Errorf("updated field impossible to verify because multiple mappings point to the same field name") - } - } else { - fInfo[name] = newInfo - } - return nil -} - +// Compares two field mappings against each other, checking for changes in index, store, doc values +// and complete deletiion of the mapping while noting that the changes made are doable based on +// other values like includeInAll and dynamic +// first return argument gives an empty fieldInfo if no changes detected +// second return argument gives a flag indicating whether any changes, if detected, are doable or if +// update is impossible +// third argument is an error explaining exactly why the change is not possible func compareFieldMapping(original, updated *mapping.FieldMapping) (*index.FieldInfo, bool, error) { rv := &index.FieldInfo{} @@ -313,3 +332,30 @@ func compareFieldMapping(original, updated *mapping.FieldMapping) (*index.FieldI } return rv, false, nil } + +// After identifying changes, validate against the existing changes incase of duplicate fields. +// In such a situation, any conflicting changes found will abort the update process +func validateFieldInfo(newInfo *index.FieldInfo, updated bool, fInfo map[string]*index.FieldInfo, + ori *pathInfo) error { + + var name string + if ori.fieldMapInfo[0].parent.parentPath == "" { + name = ori.fieldMapInfo[0].fieldMapping.Name + } else { + name = ori.fieldMapInfo[0].parent.parentPath + "." + ori.fieldMapInfo[0].fieldMapping.Name + } + if updated { + if ori.dynamic { + return fmt.Errorf("updated field is under a dynamic property") + } + } + if oldInfo, ok := fInfo[name]; ok { + if oldInfo.All != newInfo.All || oldInfo.Index != newInfo.Index || + oldInfo.DocValues != newInfo.DocValues || oldInfo.Store != newInfo.Store { + return fmt.Errorf("updated field impossible to verify because multiple mappings point to the same field name") + } + } else { + fInfo[name] = newInfo + } + return nil +}