Skip to content
This repository was archived by the owner on May 13, 2022. It is now read-only.

Commit a4c0f43

Browse files
author
Casey Kuhlman
authored
Merge pull request #1452 from hyperledger/eth-consumer
Add continuity test to vent
2 parents 0708b48 + 295170c commit a4c0f43

File tree

25 files changed

+10369
-180
lines changed

25 files changed

+10369
-180
lines changed

Makefile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ build_burrow_debug:
169169
.PHONY: install
170170
install: build_burrow
171171
mkdir -p ${BIN_PATH}
172-
install -T ${REPO}/bin/burrow ${BIN_PATH}/burrow
172+
install ${REPO}/bin/burrow ${BIN_PATH}/burrow
173173

174174
# build burrow with checks for race conditions
175175
.PHONY: build_race_db
@@ -262,9 +262,10 @@ start_ganache: $(PID_DIR)/ganache.pid
262262
stop_ganache: $(PID_DIR)/ganache.pid
263263
@kill $(shell cat $<) && echo "Ganache process stopped." && rm $< || rm $<
264264

265+
# For local debug
265266
.PHONY: postgres
266267
postgres:
267-
docker-compose up
268+
docker run -e POSTGRES_HOST_AUTH_METHOD=trust -p 5432:5432 postgres:11-alpine
268269

269270
.PHONY: test_restore
270271
test_restore:

cmd/burrow/commands/dump.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func Dump(output Output) func(cmd *cli.Cmd) {
7171
}
7272

7373
// Include all logging by default
74-
logger, err := logconfig.New().NewLogger()
74+
logger, err := logconfig.New().Logger()
7575
if err != nil {
7676
output.Fatalf("could not make logger: %v", err)
7777
}

cmd/burrow/commands/vent.go

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,26 @@ import (
2121
cli "github.com/jawher/mow.cli"
2222
)
2323

24+
type LogLevel string
25+
26+
const (
27+
LogLevelNone LogLevel = "none"
28+
LogLevelInfo LogLevel = "info"
29+
LogLevelTrace LogLevel = "trace"
30+
)
31+
32+
func logConfig(level LogLevel) *logconfig.LoggingConfig {
33+
logConf := logconfig.New()
34+
switch level {
35+
case LogLevelNone:
36+
return logConf.None()
37+
case LogLevelTrace:
38+
return logConf.WithTrace()
39+
default:
40+
return logConf
41+
}
42+
}
43+
2444
// Vent consumes EVM events and commits to a DB
2545
func Vent(output Output) func(cmd *cli.Cmd) {
2646
return func(cmd *cli.Cmd) {
@@ -32,9 +52,14 @@ func Vent(output Output) func(cmd *cli.Cmd) {
3252
dbOpts := sqlDBOpts(cmd, cfg)
3353
grpcAddrOpt := cmd.StringOpt("chain-addr", cfg.ChainAddress, "Address to connect to the Hyperledger Burrow gRPC server")
3454
httpAddrOpt := cmd.StringOpt("http-addr", cfg.HTTPListenAddress, "Address to bind the HTTP server")
35-
logLevelOpt := cmd.StringOpt("log-level", cfg.LogLevel, "Logging level (error, warn, info, debug)")
55+
logLevelOpt := cmd.StringOpt("log-level", string(LogLevelInfo), "Logging level (none, info, trace)")
3656
watchAddressesOpt := cmd.StringsOpt("watch", nil, "Add contract address to global watch filter")
3757
minimumHeightOpt := cmd.IntOpt("minimum-height", 0, "Only process block greater than or equal to height passed")
58+
maxRetriesOpt := cmd.IntOpt("max-retries", int(cfg.BlockConsumerConfig.MaxRetries), "Maximum number of retries when consuming blocks")
59+
backoffDurationOpt := cmd.StringOpt("backoff", "",
60+
"The minimum duration to wait before asking for new blocks - increases exponentially when errors occur. Values like 200ms, 1s, 2m")
61+
batchSizeOpt := cmd.IntOpt("batch-size", int(cfg.BlockConsumerConfig.MaxBlockBatchSize),
62+
"The maximum number of blocks from which to request events in a single call - will reduce logarithmically to 1 when errors occur.")
3863
abiFileOpt := cmd.StringsOpt("abi", cfg.AbiFileOrDirs, "EVM Contract ABI file or folder")
3964
specFileOrDirOpt := cmd.StringsOpt("spec", cfg.SpecFileOrDirs, "SQLSol specification file or folder")
4065
dbBlockOpt := cmd.BoolOpt("blocks", false, "Create block tables and persist related data")
@@ -43,16 +68,21 @@ func Vent(output Output) func(cmd *cli.Cmd) {
4368
announceEveryOpt := cmd.StringOpt("announce-every", "5s", "Announce vent status every period as a Go duration, e.g. 1ms, 3s, 1h")
4469

4570
cmd.Before = func() {
71+
var err error
4672
// Rather annoying boilerplate here... but there is no way to pass mow.cli a pointer for it to fill you value
4773
cfg.DBAdapter = *dbOpts.adapter
4874
cfg.DBURL = *dbOpts.url
4975
cfg.DBSchema = *dbOpts.schema
5076
cfg.ChainAddress = *grpcAddrOpt
5177
cfg.HTTPListenAddress = *httpAddrOpt
52-
cfg.LogLevel = *logLevelOpt
5378
cfg.WatchAddresses = make([]crypto.Address, len(*watchAddressesOpt))
5479
cfg.MinimumHeight = uint64(*minimumHeightOpt)
55-
var err error
80+
cfg.BlockConsumerConfig.MaxRetries = uint64(*maxRetriesOpt)
81+
cfg.BlockConsumerConfig.BaseBackoffDuration, err = parseDuration(*backoffDurationOpt)
82+
if err != nil {
83+
output.Fatalf("could not parse backoff duration: %w", err)
84+
}
85+
cfg.BlockConsumerConfig.MaxBlockBatchSize = uint64(*batchSizeOpt)
5686
for i, wa := range *watchAddressesOpt {
5787
cfg.WatchAddresses[i], err = crypto.AddressFromHexString(wa)
5888
if err != nil {
@@ -68,29 +98,31 @@ func Vent(output Output) func(cmd *cli.Cmd) {
6898
cfg.SpecOpt |= sqlsol.Tx
6999
}
70100

71-
if *announceEveryOpt != "" {
72-
var err error
73-
cfg.AnnounceEvery, err = time.ParseDuration(*announceEveryOpt)
74-
if err != nil {
75-
output.Fatalf("could not parse announce-every duration %s: %v", *announceEveryOpt, err)
76-
}
101+
cfg.AnnounceEvery, err = parseDuration(*announceEveryOpt)
102+
if err != nil {
103+
output.Fatalf("could not parse announce-every duration %s: %v", *announceEveryOpt, err)
77104
}
78105
}
79106

80107
cmd.Spec = "--spec=<spec file or dir>... [--abi=<abi file or dir>...] " +
81108
"[--watch=<contract address>...] [--minimum-height=<lowest height from which to read>] " +
109+
"[--max-retries=<max block request retries>] [--backoff=<minimum backoff duration>] " +
110+
"[--batch-size=<minimum block batch size>] " +
82111
"[--db-adapter] [--db-url] [--db-schema] [--blocks] [--txs] [--chain-addr] [--http-addr] " +
83112
"[--log-level] [--announce-every=<duration>]"
84113

85114
cmd.Action = func() {
86-
log, err := logconfig.New().NewLogger()
115+
logger, err := logConfig(LogLevel(*logLevelOpt)).Logger()
87116
if err != nil {
88117
output.Fatalf("failed to load logger: %v", err)
89118
}
90119

91-
log = log.With("service", "vent")
92-
consumer := service.NewConsumer(cfg, log, make(chan types.EventData))
93-
server := service.NewServer(cfg, log, consumer)
120+
logger = logger.With("service", "vent")
121+
consumer := service.NewConsumer(cfg, logger, make(chan types.EventData))
122+
if err != nil {
123+
output.Fatalf("Could not create Vent Consumer: %v", err)
124+
}
125+
server := service.NewServer(cfg, logger, consumer)
94126

95127
projection, err := sqlsol.SpecLoader(cfg.SpecFileOrDirs, cfg.SpecOpt)
96128
if err != nil {
@@ -194,7 +226,7 @@ func Vent(output Output) func(cmd *cli.Cmd) {
194226
}
195227

196228
cmd.Action = func() {
197-
log, err := logconfig.New().NewLogger()
229+
log, err := logconfig.New().Logger()
198230
if err != nil {
199231
output.Fatalf("failed to load logger: %v", err)
200232
}
@@ -230,6 +262,13 @@ func Vent(output Output) func(cmd *cli.Cmd) {
230262
}
231263
}
232264

265+
func parseDuration(duration string) (time.Duration, error) {
266+
if duration == "" {
267+
return 0, nil
268+
}
269+
return time.ParseDuration(duration)
270+
}
271+
233272
type dbOpts struct {
234273
adapter *string
235274
url *string

core/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (kern *Kernel) LoadKeysFromConfig(conf *keys.KeysConfig) (err error) {
3434

3535
// LoadLoggerFromConfig adds a logging configuration to the kernel
3636
func (kern *Kernel) LoadLoggerFromConfig(conf *logconfig.LoggingConfig) error {
37-
logger, err := conf.NewLogger()
37+
logger, err := conf.Logger()
3838
kern.SetLogger(logger)
3939
return err
4040
}

crypto/signature_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package crypto
2+
3+
import (
4+
"fmt"
5+
"math/big"
6+
"testing"
7+
)
8+
9+
func TestGetEthChainID(t *testing.T) {
10+
chainIDString := "BurrowChain_FAB3C1-AB0FD1"
11+
chainID := GetEthChainID(chainIDString)
12+
b := new(big.Int).SetBytes([]byte(chainIDString))
13+
fmt.Println(b)
14+
fmt.Println(chainID)
15+
fmt.Printf("%X", chainID.Bytes())
16+
}

logging/logconfig/config.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ func New() *LoggingConfig {
4343
}
4444
}
4545

46+
func (lc *LoggingConfig) WithTrace() *LoggingConfig {
47+
lc.Trace = true
48+
return lc
49+
}
50+
51+
func (lc *LoggingConfig) None() *LoggingConfig {
52+
lc.RootSink = nil
53+
return lc
54+
}
55+
4656
func (lc *LoggingConfig) Root(configure func(sink *SinkConfig) *SinkConfig) *LoggingConfig {
4757
lc.RootSink = configure(Sink())
4858
return lc
@@ -65,8 +75,16 @@ func (lc *LoggingConfig) JSONString() string {
6575
return JSONString(lc)
6676
}
6777

78+
func (lc *LoggingConfig) MustLogger() *logging.Logger {
79+
logger, err := lc.Logger()
80+
if err != nil {
81+
panic(err)
82+
}
83+
return logger
84+
}
85+
6886
// Obtain a logger from this LoggingConfig
69-
func (lc *LoggingConfig) NewLogger() (*logging.Logger, error) {
87+
func (lc *LoggingConfig) Logger() (*logging.Logger, error) {
7088
outputLogger, errCh, err := newLogger(lc)
7189
if err != nil {
7290
return nil, err

logging/logconfig/sinks.go

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/eapache/channels"
88
"github.com/go-kit/kit/log"
9+
"github.com/hyperledger/burrow/logging"
910
"github.com/hyperledger/burrow/logging/loggers"
1011
"github.com/hyperledger/burrow/logging/structure"
1112
)
@@ -159,19 +160,42 @@ func Sink() *SinkConfig {
159160
return &SinkConfig{}
160161
}
161162

162-
func (sinkConfig *SinkConfig) AddSinks(sinks ...*SinkConfig) *SinkConfig {
163-
sinkConfig.Sinks = append(sinkConfig.Sinks, sinks...)
164-
return sinkConfig
163+
func (sc *SinkConfig) MustLogger() *logging.Logger {
164+
return sc.LoggingConfig().MustLogger()
165165
}
166166

167-
func (sinkConfig *SinkConfig) SetTransform(transform *TransformConfig) *SinkConfig {
168-
sinkConfig.Transform = transform
169-
return sinkConfig
167+
func (sc *SinkConfig) Logger() (*logging.Logger, error) {
168+
return sc.LoggingConfig().Logger()
170169
}
171170

172-
func (sinkConfig *SinkConfig) SetOutput(output *OutputConfig) *SinkConfig {
173-
sinkConfig.Output = output
174-
return sinkConfig
171+
// Wrap this sink as RootSink of LoggingCOnfig
172+
func (sc *SinkConfig) LoggingConfig() *LoggingConfig {
173+
lc := New()
174+
lc.RootSink = sc
175+
return lc
176+
}
177+
178+
func (sc *SinkConfig) AddSinks(sinks ...*SinkConfig) *SinkConfig {
179+
sc.Sinks = append(sc.Sinks, sinks...)
180+
return sc
181+
}
182+
183+
func (sc *SinkConfig) SetTransform(transform *TransformConfig) *SinkConfig {
184+
sc.Transform = transform
185+
return sc
186+
}
187+
188+
func (sc *SinkConfig) FilterScope(scope string) *SinkConfig {
189+
return sc.SetTransform(FilterTransform(IncludeWhenAllMatch, structure.ScopeKey, scope))
190+
}
191+
192+
func (sc *SinkConfig) Terminal() *SinkConfig {
193+
return sc.SetOutput(StderrOutput().SetFormat(TerminalFormat))
194+
}
195+
196+
func (sc *SinkConfig) SetOutput(output *OutputConfig) *SinkConfig {
197+
sc.Output = output
198+
return sc
175199
}
176200

177201
func (outputConfig *OutputConfig) SetFormat(format string) *OutputConfig {
@@ -298,8 +322,11 @@ func VectoriseTransform() *TransformConfig {
298322
}
299323

300324
// Logger formation
301-
func (sinkConfig *SinkConfig) BuildLogger() (log.Logger, map[string]*loggers.CaptureLogger, error) {
302-
return BuildLoggerFromSinkConfig(sinkConfig, make(map[string]*loggers.CaptureLogger))
325+
func (sc *SinkConfig) BuildLogger() (log.Logger, map[string]*loggers.CaptureLogger, error) {
326+
if sc == nil {
327+
return log.NewNopLogger(), nil, nil
328+
}
329+
return BuildLoggerFromSinkConfig(sc, make(map[string]*loggers.CaptureLogger))
303330
}
304331

305332
func BuildLoggerFromSinkConfig(sinkConfig *SinkConfig, captures map[string]*loggers.CaptureLogger) (log.Logger,

rpc/lib/rpc_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func TestMain(m *testing.M) {
7878

7979
// launch unix and tcp servers
8080
func setup() {
81-
logger, err := logconfig.New().NewLogger()
81+
logger, err := logconfig.New().Logger()
8282
if err != nil {
8383
panic(err)
8484
}

0 commit comments

Comments
 (0)