Skip to content

Commit 5f10f29

Browse files
committed
Tidy up packages, settings, database connection
1 parent 89a4603 commit 5f10f29

File tree

8 files changed

+108
-65
lines changed

8 files changed

+108
-65
lines changed

cmd/contract-event-processor/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import (
66
"os"
77
"strconv"
88

9-
services "event-stream/internal"
9+
"github.com/DIMO-Network/contract-event-processor/internal/config"
10+
"github.com/DIMO-Network/contract-event-processor/internal/services"
1011

1112
"github.com/DIMO-Network/shared"
1213
"github.com/Shopify/sarama"
@@ -18,7 +19,7 @@ import (
1819

1920
func main() {
2021
logger := zerolog.New(os.Stdout).With().Timestamp().Str("app", "event-stream-processor").Logger()
21-
settings, err := shared.LoadConfig[services.Settings]("settings.yaml")
22+
settings, err := shared.LoadConfig[config.Settings]("settings.yaml")
2223
if err != nil {
2324
logger.Fatal().Err(err)
2425
}

go.mod

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
module event-stream
1+
module github.com/DIMO-Network/contract-event-processor
22

33
go 1.19
44

55
require (
66
github.com/Shopify/sarama v1.33.0
77
github.com/ethereum/go-ethereum v1.10.26
88
github.com/friendsofgo/errors v0.9.2
9+
github.com/gofiber/adaptor/v2 v2.1.31
10+
github.com/gofiber/fiber/v2 v2.41.0
911
github.com/lib/pq v1.10.7
1012
github.com/pressly/goose/v3 v3.7.0
13+
github.com/prometheus/client_golang v1.12.1
1114
github.com/volatiletech/null/v8 v8.1.2
1215
github.com/volatiletech/sqlboiler/v4 v4.14.0
1316
github.com/volatiletech/strmangle v0.0.4
@@ -16,17 +19,21 @@ require (
1619

1720
require (
1821
github.com/Shopify/toxiproxy/v2 v2.4.0 // indirect
22+
github.com/andybalholm/brotli v1.0.4 // indirect
1923
github.com/avast/retry-go/v4 v4.1.0 // indirect
2024
github.com/aws/aws-sdk-go-v2 v1.16.5 // indirect
2125
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.12 // indirect
2226
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.6 // indirect
2327
github.com/aws/aws-sdk-go-v2/service/kms v1.17.3 // indirect
2428
github.com/aws/smithy-go v1.11.3 // indirect
29+
github.com/beorn7/perks v1.0.1 // indirect
30+
github.com/cespare/xxhash/v2 v2.1.2 // indirect
2531
github.com/davecgh/go-spew v1.1.1 // indirect
2632
github.com/eapache/go-resiliency v1.2.0 // indirect
2733
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
2834
github.com/eapache/queue v1.1.0 // indirect
2935
github.com/gofrs/uuid v4.2.0+incompatible // indirect
36+
github.com/golang/protobuf v1.5.2 // indirect
3037
github.com/golang/snappy v0.0.4 // indirect
3138
github.com/hashicorp/errwrap v1.0.0 // indirect
3239
github.com/hashicorp/go-multierror v1.1.1 // indirect
@@ -36,18 +43,28 @@ require (
3643
github.com/jcmturner/gofork v1.0.0 // indirect
3744
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
3845
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
39-
github.com/klauspost/compress v1.15.6 // indirect
40-
github.com/mattn/go-colorable v0.1.12 // indirect
41-
github.com/mattn/go-isatty v0.0.16 // indirect
46+
github.com/klauspost/compress v1.15.9 // indirect
47+
github.com/mattn/go-colorable v0.1.13 // indirect
48+
github.com/mattn/go-isatty v0.0.17 // indirect
49+
github.com/mattn/go-runewidth v0.0.14 // indirect
50+
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
4251
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
4352
github.com/pkg/errors v0.9.1 // indirect
53+
github.com/prometheus/client_model v0.2.0 // indirect
54+
github.com/prometheus/common v0.32.1 // indirect
55+
github.com/prometheus/procfs v0.7.3 // indirect
4456
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
57+
github.com/rivo/uniseg v0.2.0 // indirect
4558
github.com/spf13/cast v1.5.0 // indirect
59+
github.com/valyala/bytebufferpool v1.0.0 // indirect
60+
github.com/valyala/fasthttp v1.44.0 // indirect
61+
github.com/valyala/tcplisten v1.0.0 // indirect
4662
github.com/volatiletech/inflect v0.0.1 // indirect
4763
github.com/volatiletech/randomize v0.0.1 // indirect
4864
golang.org/x/net v0.1.0 // indirect
4965
golang.org/x/tools v0.2.0 // indirect
5066
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
67+
google.golang.org/protobuf v1.28.0 // indirect
5168
)
5269

5370
require (

go.sum

Lines changed: 40 additions & 6 deletions
Large diffs are not rendered by default.

internal/config/settings.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package config
2+
3+
import "github.com/DIMO-Network/shared/db"
4+
5+
type Settings struct {
6+
EthereumRPCURL string `yaml:"ETHEREUM_RPC_URL"`
7+
8+
BlockConfirmations int `yaml:"BLOCK_CONFIRMATIONS"`
9+
10+
EventStreamTopic string `yaml:"EVENT_STREAM_TOPIC"`
11+
12+
KafkaBroker string `yaml:"KAFKA_BROKER"`
13+
Partitions int `yaml:"PARTITIONS"`
14+
15+
DB db.Settings `yaml:"DB"`
16+
17+
MonitoringPort string `yaml:"MONITORING_PORT"`
18+
}

internal/block_listener.go renamed to internal/services/block_listener.go

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414
"syscall"
1515
"time"
1616

17+
"github.com/DIMO-Network/contract-event-processor/internal/config"
18+
"github.com/DIMO-Network/contract-event-processor/models"
1719
"github.com/DIMO-Network/shared"
20+
"github.com/DIMO-Network/shared/db"
1821
"github.com/Shopify/sarama"
1922
ethereum "github.com/ethereum/go-ethereum"
2023
"github.com/ethereum/go-ethereum/accounts/abi"
@@ -28,30 +31,8 @@ import (
2831
"gopkg.in/yaml.v3"
2932

3033
_ "github.com/lib/pq"
31-
32-
"event-stream/models"
3334
)
3435

35-
type Settings struct {
36-
WebSocketAddress string `yaml:"WEB_SOCKET_ADDRESS"`
37-
AlchemyAPIKey string `yaml:"API_KEY"`
38-
39-
BlockConfirmations int `yaml:"BLOCK_CONFIRMATIONS"`
40-
41-
EventStreamTopic string `yaml:"EVENT_STREAM_TOPIC"`
42-
43-
KafkaBroker string `yaml:"KAFKA_BROKER"`
44-
Partitions int `yaml:"PARTITIONS"`
45-
46-
PostgresUser string `yaml:"POSTGRES_USER"`
47-
PostgresPassword string `yaml:"POSTGRES_PASSWORD"`
48-
PostgresDB string `yaml:"POSTGRES_DB"`
49-
PostgresHOST string `yaml:"POSTGRES_HOST"`
50-
PostgresPort int `yaml:"POSTGRES_PORT"`
51-
52-
MonitoringPort string `yaml:"MONITORING_PORT"`
53-
}
54-
5536
type BlockListener struct {
5637
Client *ethclient.Client
5738
Contracts []common.Address
@@ -60,7 +41,7 @@ type BlockListener struct {
6041
EventStreamTopic string
6142
Registry map[common.Address]map[common.Hash]abi.Event
6243
Confirmations *big.Int
63-
DB *sql.DB
44+
DB db.Store
6445
ABIs map[common.Address]abi.ABI
6546
}
6647

@@ -78,32 +59,21 @@ type Block struct {
7859
Number *big.Int
7960
}
8061

81-
func NewBlockListener(s Settings, logger zerolog.Logger, producer sarama.SyncProducer) (BlockListener, error) {
82-
c, err := ethclient.Dial(s.WebSocketAddress + s.AlchemyAPIKey)
62+
func NewBlockListener(s config.Settings, logger zerolog.Logger, producer sarama.SyncProducer) (BlockListener, error) {
63+
c, err := ethclient.Dial(s.EthereumRPCURL)
8364
if err != nil {
8465
return BlockListener{}, err
8566
}
8667

87-
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+
88-
"password=%s dbname=%s sslmode=disable",
89-
s.PostgresHOST, s.PostgresPort, s.PostgresUser, s.PostgresPassword, s.PostgresDB)
90-
91-
pg, err := sql.Open("postgres", psqlInfo)
92-
if err != nil {
93-
return BlockListener{}, err
94-
}
95-
err = pg.Ping()
96-
if err != nil {
97-
return BlockListener{}, err
98-
}
68+
pdb := db.NewDbConnectionFromSettings(context.TODO(), &s.DB, true)
9969

10070
return BlockListener{
10171
Client: c,
10272
EventStreamTopic: s.EventStreamTopic,
10373
Logger: logger,
10474
Producer: producer,
10575
Confirmations: big.NewInt(int64(s.BlockConfirmations)),
106-
DB: pg,
76+
DB: pdb,
10777
}, nil
10878
}
10979

@@ -149,7 +119,7 @@ func (bl *BlockListener) GetBlockHead(blockNum *big.Int) (*types.Header, error)
149119
return bl.Client.HeaderByNumber(context.Background(), blockNum)
150120
}
151121

152-
resp, err := models.Blocks(qm.OrderBy(models.BlockColumns.Number+" DESC")).One(context.Background(), bl.DB)
122+
resp, err := models.Blocks(qm.OrderBy(models.BlockColumns.Number+" DESC")).One(context.Background(), bl.DB.DBS().Reader)
153123
if err != nil {
154124
if errors.Is(err, sql.ErrNoRows) {
155125
bl.Logger.Info().Msg("no value passed or found in db; setting block head to current head minus five")
@@ -178,7 +148,7 @@ func (bl *BlockListener) RecordBlock(block *types.Header) error {
178148
Hash: block.Hash().Bytes(),
179149
}
180150

181-
return processedBlock.Insert(context.Background(), bl.DB, boil.Infer())
151+
return processedBlock.Insert(context.Background(), bl.DB.DBS().Writer, boil.Infer())
182152
}
183153

184154
func (bl *BlockListener) ChainIndexer(blockNum *big.Int) {

internal/kafka_service.go renamed to internal/services/kafka_service.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package services
22

33
import (
4+
"github.com/DIMO-Network/contract-event-processor/internal/config"
45
"github.com/Shopify/sarama"
56
)
67

@@ -13,7 +14,7 @@ type Event struct {
1314
EventName string `json:"eventName,omitempty"`
1415
}
1516

16-
func StartKafkaStream(s Settings) (sarama.Client, error) {
17+
func StartKafkaStream(s config.Settings) (sarama.Client, error) {
1718

1819
config := sarama.NewConfig()
1920
config.Producer.Return.Successes = true

internal/migrate.go renamed to internal/services/migrate.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,27 @@ import (
44
"database/sql"
55
"fmt"
66

7+
"github.com/DIMO-Network/contract-event-processor/internal/config"
78
"github.com/pressly/goose/v3"
89
"github.com/rs/zerolog"
910

1011
_ "github.com/lib/pq"
1112
)
1213

13-
func MigrateDatabase(logger zerolog.Logger, s *Settings, command, schemaName string) {
14+
func MigrateDatabase(logger zerolog.Logger, s *config.Settings, command, schemaName string) {
15+
var db *sql.DB
1416
// setup database
15-
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+
16-
"password=%s dbname=%s sslmode=disable",
17-
s.PostgresHOST, s.PostgresPort, s.PostgresUser, s.PostgresPassword, s.PostgresDB)
18-
19-
pg, err := sql.Open("postgres", psqlInfo)
17+
db, err := sql.Open("postgres", s.DB.BuildConnectionString(true))
18+
defer func() {
19+
if err := db.Close(); err != nil {
20+
logger.Fatal().Msgf("goose: failed to close DB: %v\n", err)
21+
}
22+
}()
2023
if err != nil {
21-
logger.Fatal().Msgf("failed to establish db connection: %v\n", err)
24+
logger.Fatal().Msgf("failed to open db connection: %v\n", err)
2225
}
2326

24-
if err = pg.Ping(); err != nil {
27+
if err = db.Ping(); err != nil {
2528
logger.Fatal().Msgf("failed to ping db: %v\n", err)
2629
}
2730

@@ -30,12 +33,12 @@ func MigrateDatabase(logger zerolog.Logger, s *Settings, command, schemaName str
3033
command = "up"
3134
}
3235
// must create schema so that can set migrations table to that schema
33-
_, err = pg.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s;", schemaName))
36+
_, err = db.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s;", schemaName))
3437
if err != nil {
3538
logger.Fatal().Err(err).Msgf("could not create schema, %s", schemaName)
3639
}
3740
goose.SetTableName(fmt.Sprintf("%s.migrations", schemaName))
38-
if err := goose.Run(command, pg, "migrations"); err != nil {
41+
if err := goose.Run(command, db, "migrations"); err != nil {
3942
logger.Fatal().Msgf("failed to apply go code migrations: %v\n", err)
4043
}
4144
// if we add any code migrations import _ "github.com/DIMO-Network/users-api/migrations" // migrations won't work without this

sample.settings.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
WEB_SOCKET_ADDRESS: wss://polygon-mainnet.g.alchemy.com/v2/
2-
API_KEY:
1+
ETHEREUM_RPC_URL: wss://polygon-mainnet.g.alchemy.com/v2/
32
EVENT_STREAM_TOPIC: topic.contract.event
43
PARTITIONS: 1
54
KAFKA_BROKER: localhost:9092

0 commit comments

Comments
 (0)