Skip to content

Commit

Permalink
wip: initial ethwal filter index
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubhaankar-Sharma committed Sep 10, 2024
1 parent 41b3251 commit 2c9d4fe
Show file tree
Hide file tree
Showing 15 changed files with 657 additions and 11 deletions.
156 changes: 156 additions & 0 deletions filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package ethwal

import (
"context"
"fmt"

"github.com/0xsequence/ethwal/storage"
"github.com/RoaringBitmap/roaring/v2/roaring64"
)

type Filter interface {
Eval() FilterIterator
}

type FilterIterator interface {
HasNext() bool
Next() (uint64, uint16)
Bitmap() *roaring64.Bitmap
}

type FilterBuilder interface {
And(filters ...Filter) Filter
Or(filters ...Filter) Filter
Eq(index string, key string) Filter
}

func NewIndexesFilterBuilder[T any](indexes Indexes[T], fs storage.FS) (FilterBuilder, error) {
indexMap := make(map[IndexName]Index[T])
for name, indexFunc := range indexes {
idx := &index[T]{
name: name.Normalize(),
indexFunc: indexFunc,
fs: fs,
}
indexMap[name] = idx
}
return &chainLensFilterBuilder[T]{
indexes: indexMap,
}, nil
}

type chainLensFilterBuilder[T any] struct {
indexes map[IndexName]Index[T]
}

type chainLensFilter struct {
resultSet *roaring64.Bitmap
}

func (c *chainLensFilter) Eval() FilterIterator {
if c.resultSet == nil {
c.resultSet = roaring64.New()
}

return newFilterIterator(c.resultSet.Clone())
}

func (c *chainLensFilterBuilder[T]) And(filters ...Filter) Filter {
var bmap *roaring64.Bitmap
for _, filter := range filters {
if filter == nil {
continue
}
if _, ok := filter.(*noOpFilter); ok {
continue
}
iter := filter.Eval()
if bmap == nil {
bmap = iter.Bitmap().Clone()
fmt.Println("first bmap", bmap.GetCardinality())
} else {
fmt.Println("iter", iter.Bitmap().GetCardinality())
bmap.And(iter.Bitmap())
}
}
return &chainLensFilter{
resultSet: bmap,
}
}

func (c *chainLensFilterBuilder[T]) Or(filters ...Filter) Filter {
var bmap *roaring64.Bitmap
for _, filter := range filters {
if filter == nil {
continue
}
if _, ok := filter.(*noOpFilter); ok {
continue
}
iter := filter.Eval()
if bmap == nil {
bmap = iter.Bitmap().Clone()
} else {
bmap.Or(iter.Bitmap())
}
}

return &chainLensFilter{
resultSet: bmap,
}
}

func (c *chainLensFilterBuilder[T]) Eq(index string, key string) Filter {
// fetch the index and store it in the result set
index_ := IndexName(index).Normalize()
idx, ok := c.indexes[index_]
if !ok {
return &noOpFilter{}
}

// TODO: what should the context be?
bitmap, err := idx.Fetch(context.Background(), key)
if err != nil {
return &noOpFilter{}
}

return &chainLensFilter{
resultSet: bitmap,
}
}

type noOpFilter struct{}

func (n *noOpFilter) Eval() FilterIterator {
bmap := roaring64.New()
return &filterIterator{
iter: bmap.Iterator(),
bitmap: bmap,
}
}

type filterIterator struct {
iter roaring64.IntPeekable64
bitmap *roaring64.Bitmap
}

func newFilterIterator(bmap *roaring64.Bitmap) FilterIterator {
return &filterIterator{
iter: bmap.Iterator(),
bitmap: bmap,
}
}

func (f *filterIterator) HasNext() bool {
return f.iter.HasNext()
}

func (f *filterIterator) Next() (uint64, uint16) {
// TODO: how to handle if there's no next?
val := f.iter.Next()
return IndexCompoundID(val).Split()
}

func (f *filterIterator) Bitmap() *roaring64.Bitmap {
return f.bitmap
}
125 changes: 125 additions & 0 deletions filter_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package ethwal

import (
"context"
"crypto/sha256"
"fmt"
"strings"

"github.com/0xsequence/ethwal/storage"
"github.com/RoaringBitmap/roaring/v2/roaring64"
)

type IndexFunction[T any] func(block Block[T]) (toIndex bool, indexValues []string, positions []uint16, err error)

type IndexCompoundID uint64

func NewIndexCompoundID(blockNum uint64, dataIndex uint16) IndexCompoundID {
return IndexCompoundID(uint64(blockNum<<16 | uint64(dataIndex)))
}

func (i IndexCompoundID) BlockNumber() uint64 {
return (uint64(i) & 0xFFFFFFFFFFFF0000) >> 16
}

func (i IndexCompoundID) DataIndex() uint16 {
return uint16(i) & 0xFFFF
}

func (i IndexCompoundID) Split() (uint64, uint16) {
return i.BlockNumber(), i.DataIndex()
}

type IndexName string

func (i IndexName) Normalize() IndexName {
return IndexName(strings.ToLower(string(i)))
}

type Indexes[T any] map[IndexName]IndexFunction[T]

type Index[T any] interface {
Fetch(ctx context.Context, key string) (*roaring64.Bitmap, error)
Store(ctx context.Context, block Block[T]) error
Name() IndexName
}

type index[T any] struct {
name IndexName
indexFunc IndexFunction[T]
fs storage.FS
}

var _ Index[any] = (*index[any])(nil)

func (i *index[T]) Fetch(ctx context.Context, indexValue string) (*roaring64.Bitmap, error) {
file, err := newIndexFile(i.fs, i.name, indexValue)
if err != nil {
return nil, fmt.Errorf("failed to open index file: %w", err)
}
bmap, err := file.Read(ctx)
if err != nil {
return nil, err
}

return bmap, nil
}

func (i *index[T]) Store(ctx context.Context, block Block[T]) error {
toIndex, indexValues, indexPositions, err := i.indexFunc(block)
if err != nil {
return fmt.Errorf("failed to index block: %w", err)
}
if !toIndex {
return nil
}

indexValueMap := make(map[string][]IndexCompoundID)
for i, indexValue := range indexValues {
if _, ok := indexValueMap[indexValue]; !ok {
indexValueMap[indexValue] = make([]IndexCompoundID, 0)
}
indexValueMap[indexValue] = append(indexValueMap[indexValue], NewIndexCompoundID(block.Number, indexPositions[i]))
}

for indexValue, indexIDs := range indexValueMap {
file, err := newIndexFile(i.fs, i.name, indexValue)
if err != nil {
return fmt.Errorf("failed to open or create index file: %w", err)
}
bmap, err := file.Read(ctx)
if err != nil {
return err
}
for _, indexID := range indexIDs {
bmap.Add(uint64(indexID))
}
err = file.Write(ctx, bmap)
if err != nil {
return err
}
}

return nil
}

func (i *index[T]) Name() IndexName {
return i.name
}

func indexPath(index string, indexValue string) string {
h := sha256.New()
h.Write([]byte(indexValue))
hashed := h.Sum(nil)
hashString := fmt.Sprintf("%x", hashed)
dividedLen := int(len(hashString) / 4)
indexValues := make([]string, 4)
for i := 0; i < 4; i++ {
if i == 3 {
indexValues[i] = hashString[i*dividedLen:]
continue
}
indexValues[i] = hashString[i*dividedLen : (i+1)*dividedLen]
}
return fmt.Sprintf("%s/%s/%s/%s/%s", index, indexValues[0], indexValues[1], indexValues[2], indexValues[3])
}
60 changes: 60 additions & 0 deletions filter_index_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package ethwal

import (
"context"
"fmt"

"github.com/0xsequence/ethwal/storage"
"github.com/RoaringBitmap/roaring/v2/roaring64"
)

type indexFile struct {
fs storage.FS
path string
}

func newIndexFile(fs storage.FS, indexName IndexName, key string) (*indexFile, error) {
path := indexPath(string(indexName), key)
return &indexFile{fs: fs, path: path}, nil
}

func (i *indexFile) Read(ctx context.Context) (*roaring64.Bitmap, error) {
file, err := i.fs.Open(ctx, i.path, nil)
if err != nil {
// TODO: decide if we should report an error or just create a new roaring bitmap...
// with this approach we are not reporting an error if the file does not exist
// and we just write the new bitmap when write is called...
// return nil, fmt.Errorf("failed to open index file: %w", err)
return roaring64.New(), nil
}
defer file.Close()
var buf []byte = make([]byte, file.Size)
_, err = file.Read(buf)
if err != nil {
return nil, fmt.Errorf("failed to read index file: %w", err)
}
bmap := roaring64.New()
err = bmap.UnmarshalBinary(buf)

if err != nil {
return nil, fmt.Errorf("failed to unmarshal bitmap: %w", err)
}
return bmap, nil
}

func (i *indexFile) Write(ctx context.Context, bmap *roaring64.Bitmap) error {
file, err := i.fs.Create(ctx, i.path, nil)
if err != nil {
return fmt.Errorf("failed to open index file: %w", err)
}
defer file.Close()
data, err := bmap.ToBytes()
if err != nil {
return fmt.Errorf("failed to marshal bitmap: %w", err)
}
_, err = file.Write(data)
if err != nil {
return fmt.Errorf("failed to write bitmap: %w", err)
}
return nil
}
55 changes: 55 additions & 0 deletions filter_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package ethwal

import (
"context"
"io"
)

type chainLensReader[T any] struct {
reader Reader[T]
filter Filter
iterator FilterIterator
}

var _ Reader[any] = (*chainLensReader[any])(nil)

func NewChainLensReader[T any](reader Reader[T], filter Filter) (Reader[T], error) {
return &chainLensReader[T]{
reader: reader,
filter: filter,
iterator: filter.Eval(),
}, nil
}

func (c *chainLensReader[T]) FilesNum() int {
return c.reader.FilesNum()
}

func (c *chainLensReader[T]) Seek(ctx context.Context, blockNum uint64) error {
// TODO: how should seek function?
return c.reader.Seek(ctx, blockNum)
}

func (c *chainLensReader[T]) BlockNum() uint64 {
// TODO: INCOMPLETE
return c.reader.BlockNum()
}

func (c *chainLensReader[T]) Read(ctx context.Context) (Block[T], error) {
if !c.iterator.HasNext() {
return Block[T]{}, io.EOF
}

// TODO: decide about the index what to do??
blockNum, _ := c.iterator.Next()
err := c.reader.Seek(ctx, blockNum)
if err != nil {
return Block[T]{}, err
}

return c.reader.Read(ctx)
}

func (c *chainLensReader[T]) Close() error {
return c.reader.Close()
}
Loading

0 comments on commit 2c9d4fe

Please sign in to comment.