Skip to content

Commit

Permalink
node/meta: move block related things in a separate file
Browse files Browse the repository at this point in the history
Also rename `container.go` to `containers.go` since we already have `blocks.go`
and `notifications.go` with the same meaning.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Feb 21, 2025
1 parent 96ef86e commit f85af2f
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 88 deletions.
101 changes: 101 additions & 0 deletions pkg/services/meta/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package meta

import (
"context"
"fmt"

"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"go.uber.org/zap"
)

func (m *Meta) handleBlock(b *block.Header) error {
h := b.Hash()
ind := b.Index
l := m.l.With(zap.Stringer("block hash", h), zap.Uint32("index", ind))
l.Debug("handling block")

evName := objPutEvName
m.cliM.RLock()
res, err := m.ws.GetBlockNotifications(h, &neorpc.NotificationFilter{
Contract: &m.cnrH,
Name: &evName,
})
if err != nil {
m.cliM.RUnlock()
return fmt.Errorf("fetching %s block: %w", h, err)
}

Check warning on line 28 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L26-L28

Added lines #L26 - L28 were not covered by tests
m.cliM.RUnlock()

if len(res.Application) == 0 {
return nil
}

Check warning on line 33 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L32-L33

Added lines #L32 - L33 were not covered by tests

m.m.RLock()
defer m.m.RUnlock()

for _, n := range res.Application {
ev, err := parseObjNotification(n)
if err != nil {
l.Error("invalid object notification received", zap.Error(err))
continue

Check warning on line 42 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}

s, ok := m.storages[ev.cID]
if !ok {
l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID))
continue

Check warning on line 48 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}

err = m.handleObjectNotification(s, ev)
if err != nil {
l.Error("handling object notification", zap.Error(err))
continue

Check warning on line 54 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}

l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID))
}

for _, st := range m.storages {
// TODO: parallelize depending on what can parallelize well

st.m.Lock()

root := st.mpt.StateRoot()
st.mpt.Store.Put([]byte{rootKey}, root[:])
p := st.path
if st.opsBatch != nil {
_, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.opsBatch))
if err != nil {
st.m.Unlock()
return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err)
}

Check warning on line 73 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L71-L73

Added lines #L71 - L73 were not covered by tests

st.opsBatch = nil
}

st.m.Unlock()

st.mpt.Flush(ind)
}

l.Debug("handled block successfully")

return nil
}

func (m *Meta) blockFetcher(ctx context.Context, buff <-chan *block.Header) {
for {
select {
case <-ctx.Done():
return
case b := <-buff:
err := m.handleBlock(b)
if err != nil {
m.l.Error("block handling failed", zap.Error(err))
continue

Check warning on line 97 in pkg/services/meta/blocks.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/blocks.go#L96-L97

Added lines #L96 - L97 were not covered by tests
}
}
}
}
File renamed without changes.
88 changes: 0 additions & 88 deletions pkg/services/meta/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
Expand Down Expand Up @@ -229,93 +228,6 @@ outer:
return cli, nil
}

func (m *Meta) handleBlock(b *block.Header) error {
h := b.Hash()
ind := b.Index
l := m.l.With(zap.Stringer("block hash", h), zap.Uint32("index", ind))
l.Debug("handling block")

evName := objPutEvName
res, err := m.ws.GetBlockNotifications(h, &neorpc.NotificationFilter{
Contract: &m.cnrH,
Name: &evName,
})
if err != nil {
return fmt.Errorf("fetching %s block: %w", h, err)
}

if len(res.Application) == 0 {
return nil
}

m.m.RLock()
defer m.m.RUnlock()

for _, n := range res.Application {
ev, err := parseObjNotification(n)
if err != nil {
l.Error("invalid object notification received", zap.Error(err))
continue
}

s, ok := m.storages[ev.cID]
if !ok {
l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID))
continue
}

err = m.handleObjectNotification(s, ev)
if err != nil {
l.Error("handling object notification", zap.Error(err))
continue
}

l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID))
}

for _, st := range m.storages {
// TODO: parallelize depending on what can parallelize well

st.m.Lock()

root := st.mpt.StateRoot()
st.mpt.Store.Put([]byte{rootKey}, root[:])
p := st.path
if st.opsBatch != nil {
_, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.opsBatch))
if err != nil {
st.m.Unlock()
return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err)
}

st.opsBatch = nil
}

st.m.Unlock()

st.mpt.Flush(ind)
}

l.Debug("handled block successfully")

return nil
}

func (m *Meta) blockFetcher(ctx context.Context, buff <-chan *block.Header) {
for {
select {
case <-ctx.Done():
return
case b := <-buff:
err := m.handleBlock(b)
if err != nil {
m.l.Error("block handling failed", zap.Error(err))
continue
}
}
}
}

const (
// MPT key prefixes.
oidIndex = iota
Expand Down

0 comments on commit f85af2f

Please sign in to comment.