Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node/meta: Use neo-go's new notification API #3164

Merged
merged 3 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions pkg/services/meta/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package meta

import (
"context"
"fmt"

"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"go.uber.org/zap"
)

func (m *Meta) handleBlock(b *block.Header) error {
h := b.Hash()
ind := b.Index
l := m.l.With(zap.Stringer("block hash", h), zap.Uint32("index", ind))
l.Debug("handling block")

evName := objPutEvName
m.cliM.RLock()
res, err := m.ws.GetBlockNotifications(h, &neorpc.NotificationFilter{
Contract: &m.cnrH,
Name: &evName,
})
if err != nil {
m.cliM.RUnlock()
return fmt.Errorf("fetching %s block: %w", h, err)
}

Check warning on line 28 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L26-L28

Added lines #L26 - L28 were not covered by tests
m.cliM.RUnlock()

if len(res.Application) == 0 {
return nil
}

Check warning on line 33 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L32-L33

Added lines #L32 - L33 were not covered by tests

m.m.RLock()
defer m.m.RUnlock()

for _, n := range res.Application {
ev, err := parseObjNotification(n)
if err != nil {
l.Error("invalid object notification received", zap.Error(err))
continue

Check warning on line 42 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}

s, ok := m.storages[ev.cID]
if !ok {
l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID))
continue

Check warning on line 48 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}

err = m.handleObjectNotification(s, ev)
if err != nil {
l.Error("handling object notification", zap.Error(err))
continue

Check warning on line 54 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}

l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID))
}

for _, st := range m.storages {
// TODO: parallelize depending on what can parallelize well

st.m.Lock()

root := st.mpt.StateRoot()
st.mpt.Store.Put([]byte{rootKey}, root[:])
p := st.path
if st.opsBatch != nil {
_, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.opsBatch))
if err != nil {
st.m.Unlock()
return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err)
}

Check warning on line 73 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L71-L73

Added lines #L71 - L73 were not covered by tests

st.opsBatch = nil
}

st.m.Unlock()

st.mpt.Flush(ind)
}

l.Debug("handled block successfully")

return nil
}

func (m *Meta) blockFetcher(ctx context.Context, buff <-chan *block.Header) {
for {
select {
case <-ctx.Done():
return
case b := <-buff:
err := m.handleBlock(b)
if err != nil {
m.l.Error("block handling failed", zap.Error(err))
continue

Check warning on line 97 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L96-L97

Added lines #L96 - L97 were not covered by tests
}
}
}
}
File renamed without changes.
56 changes: 35 additions & 21 deletions pkg/services/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/util"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"go.uber.org/zap"
Expand All @@ -32,6 +33,17 @@
List() (map[cid.ID]struct{}, error)
}

// wsClient is for test purposes only.
type wsClient interface {
GetBlockNotifications(blockHash util.Uint256, filters ...*neorpc.NotificationFilter) (*result.BlockNotifications, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to write docs incl. what behavior Meta expects for blocks w/o notifications - error of both nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what docs? this is literally simplification for tests. if there is either a test http server that can be mocked or we do not use tests at all, this interface does not exist. this is literally rpcclient.WSClient. docs will not save us if neo-go decides to change behavior. i do not plan to have any self written implementation here ever

Copy link
Contributor

@cthulhu-rider cthulhu-rider Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ik why it's needed, idk how GetBlockNotifications interface behaves in mentioned case and wanna highlight this

neo-go decides to change behavior

it definitely wont change the behavior intentionally

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will some reference to rpcclient.WSClient be enough in the comment? cause i do not understand you. nothing should implement it and it declares nothing for no one, it is literally rpcclient.WSClient but it is hard to mock it for tests. and i prefer this interface not to exist, but it would decrease coverage. i believe we will come to httptest package usage but not now

Copy link
Contributor

@cthulhu-rider cthulhu-rider Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, u may state that this is a part of WSClient interface consumed by Meta

but 1st of all i wanna realize what is returned - 404 error, both nil or non-nil with empty fields?

GetVersion() (*result.Version, error)

ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error)
ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error)

Close()
}

// Meta handles object meta information received from FS chain and object
// storages. Chain information is stored in Merkle-Patricia Tries. Full objects
// index is built and stored as a simple KV storage.
Expand All @@ -47,21 +59,21 @@

timeout time.Duration
magicNumber uint32
ws *rpcclient.WSClient
cliM sync.RWMutex
ws wsClient
bCh chan *block.Header
objEv chan *state.ContainedNotificationEvent
cnrDelEv chan *state.ContainedNotificationEvent
cnrPutEv chan *state.ContainedNotificationEvent
epochEv chan *state.ContainedNotificationEvent

objNotificationBuff chan *state.ContainedNotificationEvent
blockBuff chan *block.Header

// runtime reload fields
cfgM sync.RWMutex
endpoints []string
}

const objsBufferSize = 1024
const blockBuffSize = 1024

// Parameters groups arguments for [New] call.
type Parameters struct {
Expand Down Expand Up @@ -136,20 +148,19 @@
}

return &Meta{
l: p.Logger,
rootPath: p.RootPath,
netmapH: p.NetmapHash,
cnrH: p.ContainerHash,
cLister: p.ContainerLister,
endpoints: p.NeoEnpoints,
timeout: p.Timeout,
bCh: make(chan *block.Header),
objEv: make(chan *state.ContainedNotificationEvent),
cnrDelEv: make(chan *state.ContainedNotificationEvent),
cnrPutEv: make(chan *state.ContainedNotificationEvent),
epochEv: make(chan *state.ContainedNotificationEvent),
objNotificationBuff: make(chan *state.ContainedNotificationEvent, objsBufferSize),
storages: storages}, nil
l: p.Logger,
rootPath: p.RootPath,
netmapH: p.NetmapHash,
cnrH: p.ContainerHash,
cLister: p.ContainerLister,
endpoints: p.NeoEnpoints,
timeout: p.Timeout,
bCh: make(chan *block.Header),
cnrDelEv: make(chan *state.ContainedNotificationEvent),
cnrPutEv: make(chan *state.ContainedNotificationEvent),
epochEv: make(chan *state.ContainedNotificationEvent),
blockBuff: make(chan *block.Header, blockBuffSize),
storages: storages}, nil

Check warning on line 163 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L151-L163

Added lines #L151 - L163 were not covered by tests
}

// Reload updates service in runtime.
Expand Down Expand Up @@ -198,13 +209,16 @@
}

go m.flusher(ctx)
go m.objNotificationWorker(ctx, m.objNotificationBuff)
go m.blockFetcher(ctx, m.blockBuff)

Check warning on line 212 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L212

Added line #L212 was not covered by tests

return m.listenNotifications(ctx)
}

func (m *Meta) flusher(ctx context.Context) {
const flushInterval = time.Second
const (
flushInterval = time.Second
collapseDepth = 10
)

t := time.NewTicker(flushInterval)
defer t.Stop()
Expand Down
115 changes: 7 additions & 108 deletions pkg/services/meta/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"time"

"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
Expand All @@ -32,12 +31,6 @@
return fmt.Errorf("subscribe for block headers: %w", err)
}

objEv := objPutEvName
_, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &objEv}, m.objEv)
if err != nil {
return fmt.Errorf("subscribe for object notifications: %w", err)
}

cnrDeleteEv := cnrDeleteName
_, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrDeleteEv}, m.cnrDelEv)
if err != nil {
Expand Down Expand Up @@ -72,26 +65,7 @@
continue
}

go func() {
err := m.handleBlock(h.Index)
if err != nil {
m.l.Error(fmt.Sprintf("processing %d block", h.Index), zap.Error(err))
return
}
}()
case aer, ok := <-m.objEv:
if !ok {
err := m.reconnect(ctx)
if err != nil {
return err
}

continue
}

// TODO: https://github.com/nspcc-dev/neo-go/issues/3779 receive somehow notifications from blocks

m.objNotificationBuff <- aer
m.blockBuff <- h
case aer, ok := <-m.cnrDelEv:
if !ok {
err := m.reconnect(ctx)
Expand Down Expand Up @@ -192,14 +166,16 @@
func (m *Meta) reconnect(ctx context.Context) error {
m.l.Warn("reconnecting to web socket client due to connection lost")

m.cliM.Lock()
defer m.cliM.Unlock()

Check warning on line 171 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L169-L171

Added lines #L169 - L171 were not covered by tests
var err error
m.ws, err = m.connect(ctx)
if err != nil {
return fmt.Errorf("reconnecting to web socket: %w", err)
}

m.bCh = make(chan *block.Header)
m.objEv = make(chan *state.ContainedNotificationEvent)
m.cnrDelEv = make(chan *state.ContainedNotificationEvent)
m.cnrPutEv = make(chan *state.ContainedNotificationEvent)
m.epochEv = make(chan *state.ContainedNotificationEvent)
Expand Down Expand Up @@ -252,80 +228,6 @@
return cli, nil
}

const (
collapseDepth = 10
)

func (m *Meta) handleBlock(ind uint32) error {
l := m.l.With(zap.Uint32("block", ind))
l.Debug("handling block")

m.m.RLock()
defer m.m.RUnlock()

for _, st := range m.storages {
// TODO: parallelize depending on what can parallelize well

st.m.Lock()

root := st.mpt.StateRoot()
st.mpt.Store.Put([]byte{rootKey}, root[:])
p := st.path
if st.opsBatch != nil {
_, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.opsBatch))
if err != nil {
st.m.Unlock()
return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err)
}

st.opsBatch = nil
}

st.m.Unlock()

st.mpt.Flush(ind)
}

// TODO drop containers that node does not belong to anymore?

l.Debug("handled block successfully")

return nil
}

func (m *Meta) objNotificationWorker(ctx context.Context, ch <-chan *state.ContainedNotificationEvent) {
for {
select {
case <-ctx.Done():
return
case n := <-ch:
l := m.l.With(zap.Stringer("notification container", n.Container))

ev, err := parseObjNotification(n)
if err != nil {
l.Error("invalid object notification received", zap.Error(err))
continue
}

m.m.RLock()
_, ok := m.storages[ev.cID]
m.m.RUnlock()
if !ok {
l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID))
continue
}

err = m.handleObjectNotification(ev)
if err != nil {
l.Error("handling object notification", zap.Error(err))
return
}

l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID))
}
}
}

const (
// MPT key prefixes.
oidIndex = iota
Expand Down Expand Up @@ -365,7 +267,7 @@
typ objectsdk.Type
}

func parseObjNotification(ev *state.ContainedNotificationEvent) (objEvent, error) {
func parseObjNotification(ev state.ContainedNotificationEvent) (objEvent, error) {
const expectedNotificationArgs = 3
var res objEvent

Expand Down Expand Up @@ -487,15 +389,12 @@
return m.Value().([]stackitem.MapElement)[i].Value
}

func (m *Meta) handleObjectNotification(e objEvent) error {
func (m *Meta) handleObjectNotification(s *containerStorage, e objEvent) error {
if magic := uint32(e.network.Uint64()); magic != m.magicNumber {
return fmt.Errorf("wrong magic number %d, expected: %d", magic, m.magicNumber)
}

m.m.RLock()
defer m.m.RUnlock()

err := m.storages[e.cID].putObject(e)
err := s.putObject(e)
if err != nil {
return err
}
Expand Down
Loading
Loading