Skip to content

Commit 619b77a

Browse files
index chain and filter for relevant contracts
1 parent 3eeb2ac commit 619b77a

File tree

14 files changed

+369
-2968
lines changed

14 files changed

+369
-2968
lines changed

abi/issuance.json

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
[
2+
{
3+
"inputs": [
4+
{ "internalType": "address", "name": "user", "type": "address" },
5+
{ "internalType": "uint256", "name": "_amount", "type": "uint256" },
6+
{ "internalType": "uint256", "name": "vehicleNodeId", "type": "uint256" }
7+
],
8+
"name": "InvalidAddress",
9+
"type": "error"
10+
},
11+
{
12+
"inputs": [
13+
{ "internalType": "address", "name": "user", "type": "address" }
14+
],
15+
"name": "TestIssue",
16+
"type": "error"
17+
},
18+
{
19+
"anonymous": false,
20+
"inputs": [
21+
{
22+
"indexed": true,
23+
"internalType": "address",
24+
"name": "user",
25+
"type": "address"
26+
}
27+
],
28+
"name": "DQ",
29+
"type": "event"
30+
},
31+
{
32+
"anonymous": false,
33+
"inputs": [
34+
{
35+
"indexed": true,
36+
"internalType": "address",
37+
"name": "user",
38+
"type": "address"
39+
},
40+
{
41+
"indexed": false,
42+
"internalType": "uint256",
43+
"name": "_amount",
44+
"type": "uint256"
45+
},
46+
{
47+
"indexed": false,
48+
"internalType": "uint256",
49+
"name": "vehicleNodeId",
50+
"type": "uint256"
51+
}
52+
],
53+
"name": "DidntQualify",
54+
"type": "event"
55+
},
56+
{
57+
"anonymous": false,
58+
"inputs": [
59+
{
60+
"indexed": true,
61+
"internalType": "address",
62+
"name": "user",
63+
"type": "address"
64+
}
65+
],
66+
"name": "TestTransfer",
67+
"type": "event"
68+
},
69+
{
70+
"anonymous": false,
71+
"inputs": [
72+
{
73+
"indexed": true,
74+
"internalType": "address",
75+
"name": "user",
76+
"type": "address"
77+
},
78+
{
79+
"indexed": false,
80+
"internalType": "uint256",
81+
"name": "_amount",
82+
"type": "uint256"
83+
},
84+
{
85+
"indexed": false,
86+
"internalType": "uint256",
87+
"name": "vehicleNodeId",
88+
"type": "uint256"
89+
}
90+
],
91+
"name": "TokensTransferred",
92+
"type": "event"
93+
},
94+
{
95+
"inputs": [
96+
{ "internalType": "address[]", "name": "users", "type": "address[]" },
97+
{ "internalType": "uint256[]", "name": "values", "type": "uint256[]" },
98+
{ "internalType": "uint256[]", "name": "vehicleIds", "type": "uint256[]" }
99+
],
100+
"name": "batchTransfer",
101+
"outputs": [],
102+
"stateMutability": "nonpayable",
103+
"type": "function"
104+
},
105+
{
106+
"inputs": [
107+
{ "internalType": "address[]", "name": "users", "type": "address[]" }
108+
],
109+
"name": "tester",
110+
"outputs": [],
111+
"stateMutability": "nonpayable",
112+
"type": "function"
113+
}
114+
]

config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
contracts:
2+
- address: 0x8129f3cD3EBA82136Caf5aB87E2321c958Da5B63
3+
startBlock: 28933550
4+
abi: abi/issuance.json

docker-compose.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
version: '3.9'
2+
3+
services:
4+
redis:
5+
image: redis:6.2
6+
container_name: redis-devices-api
7+
ports:
8+
- "6379:6379"
9+
10+
zookeeper:
11+
image: 'wurstmeister/zookeeper:latest'
12+
ports:
13+
- '2181:2181'
14+
environment:
15+
- ALLOW_ANONYMOUS_LOGIN=yes
16+
17+
kafka:
18+
image: 'wurstmeister/kafka:latest'
19+
ports:
20+
- '9092:9092'
21+
environment:
22+
- KAFKA_BROKER_ID=1
23+
- KAFKA_LISTENERS=PLAINTEXT://:9092
24+
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
25+
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
26+
- ALLOW_PLAINTEXT_LISTENER=yes
27+
depends_on:
28+
- zookeeper

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.19
55
require (
66
github.com/Shopify/sarama v1.33.0
77
github.com/ethereum/go-ethereum v1.10.26
8+
gopkg.in/yaml.v3 v3.0.1
89
)
910

1011
require (
@@ -37,7 +38,6 @@ require (
3738
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
3839
github.com/rjeczalik/notify v0.9.1 // indirect
3940
golang.org/x/net v0.0.0-20220607020251-c690dde0001d // indirect
40-
gopkg.in/yaml.v3 v3.0.1 // indirect
4141
)
4242

4343
require (
@@ -55,6 +55,7 @@ require (
5555
github.com/tklauser/go-sysconf v0.3.5 // indirect
5656
github.com/tklauser/numcpus v0.2.2 // indirect
5757
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
58-
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
58+
golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a
59+
golang.org/x/sys v0.1.0 // indirect
5960
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
6061
)

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
170170
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
171171
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 h1:f+lwQ+GtmgoY+A2YaQxlSOnDjXcQ7ZRLWOHbC6HtRqE=
172172
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
173+
golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a h1:tlXy25amD5A7gOfbXdqCGN5k8ESEed/Ee1E5RcrYnqU=
174+
golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
173175
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
174176
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
175177
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -194,8 +196,8 @@ golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBc
194196
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
195197
golang.org/x/sys v0.0.0-20211107104306-e0b2ad06fe42/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
196198
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
197-
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
198-
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
199+
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
200+
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
199201
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
200202
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
201203
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

issuance_listener.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log"
8+
"math/big"
9+
"os"
10+
"os/signal"
11+
"syscall"
12+
"time"
13+
14+
"github.com/DIMO-Network/shared"
15+
"github.com/Shopify/sarama"
16+
ethereum "github.com/ethereum/go-ethereum"
17+
"github.com/ethereum/go-ethereum/accounts/abi"
18+
"github.com/ethereum/go-ethereum/common"
19+
"github.com/ethereum/go-ethereum/core/types"
20+
"github.com/ethereum/go-ethereum/ethclient"
21+
"github.com/rs/zerolog"
22+
"github.com/segmentio/ksuid"
23+
)
24+
25+
func NewIssuanceContractListener(listener BlockListener, producer sarama.SyncProducer, logger *zerolog.Logger) (IssuanceContractListener, error) {
26+
return IssuanceContractListener{
27+
Address: common.HexToAddress("0x8129f3cD3EBA82136Caf5aB87E2321c958Da5B63"),
28+
Producer: producer,
29+
Logger: logger,
30+
Listener: listener,
31+
}, nil
32+
33+
}
34+
35+
type IssuanceContractListener struct {
36+
Logger *zerolog.Logger
37+
Address common.Address
38+
Listener BlockListener
39+
Producer sarama.SyncProducer
40+
}
41+
42+
func (rcl IssuanceContractListener) NextBlock() (*types.Header, error) {
43+
44+
// pull latest block from redis
45+
// if redis is empty, get the most recent block on chain
46+
// return rcl.Listener.Client.HeaderByNumber(context.Background(), nil)
47+
48+
head, err := rcl.Listener.Client.HeaderByNumber(context.Background(), nil)
49+
if err != nil {
50+
log.Fatal(err)
51+
}
52+
head.Number = big.NewInt(37850310)
53+
return head, nil
54+
}
55+
56+
func (rcl IssuanceContractListener) IssuanceContractIndexer() {
57+
58+
tick := time.NewTicker(2 * time.Second)
59+
defer tick.Stop()
60+
61+
sigChan := make(chan os.Signal, 1)
62+
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
63+
64+
fmt.Println("Running...")
65+
66+
Loop:
67+
for {
68+
select {
69+
case <-tick.C:
70+
71+
head, err := rcl.NextBlock()
72+
73+
block, err := rcl.Listener.Client.HeaderByNumber(context.Background(), new(big.Int).Sub(head.Number, rcl.Listener.Confirmations))
74+
if err != nil {
75+
log.Fatal(err)
76+
}
77+
78+
lastConf = &Block{Hash: block.Hash(), Number: block.Number}
79+
err = rcl.ProcessBlock(rcl.Listener.Client, lastConf)
80+
if err != nil {
81+
log.Fatal(err)
82+
}
83+
84+
head.Number = new(big.Int).Add(head.Number, big.NewInt(1))
85+
86+
case sig := <-sigChan:
87+
log.Printf("Received signal, terminating: %s", sig)
88+
break Loop
89+
}
90+
}
91+
92+
}
93+
94+
func (rcl IssuanceContractListener) ProcessBlock(client *ethclient.Client, block *Block) error {
95+
log.Printf("Processing block %s", block.Number)
96+
97+
fil := ethereum.FilterQuery{
98+
BlockHash: &block.Hash,
99+
Addresses: []common.Address{rcl.Address},
100+
}
101+
logs, err := client.FilterLogs(context.Background(), fil)
102+
if err != nil {
103+
return err
104+
}
105+
106+
for _, vLog := range logs {
107+
if vLog.Removed {
108+
rcl.Logger.Info().Uint64("Block Number", vLog.BlockNumber).Msg("Log removed")
109+
}
110+
111+
fmt.Println("Block Number: ", vLog.BlockNumber)
112+
if ev, ok := rcl.Listener.Registry[rcl.Address][vLog.Topics[0]]; ok {
113+
114+
event := shared.CloudEvent[Event]{
115+
ID: ksuid.New().String(),
116+
Source: string(vLog.Address.String()),
117+
Subject: vLog.TxHash.String(),
118+
Time: time.Now().UTC(),
119+
Data: Event{
120+
Contract: vLog.Address.String(),
121+
Sig: vLog.Topics[0].String(),
122+
}}
123+
124+
event.Data.Arguments = make(map[string]any)
125+
err = ev.Inputs.UnpackIntoMap(event.Data.Arguments, vLog.Data)
126+
var indexed abi.Arguments
127+
for _, arg := range ev.Inputs {
128+
if arg.Indexed {
129+
indexed = append(indexed, arg)
130+
}
131+
}
132+
// TODO-- topic slice looks odd, was getting mismatched length error before
133+
err = abi.ParseTopicsIntoMap(event.Data.Arguments, indexed, vLog.Topics[len(vLog.Topics)-len(indexed):])
134+
if err != nil {
135+
log.Fatal(err)
136+
}
137+
138+
eBytes, _ := json.Marshal(event)
139+
message := &sarama.ProducerMessage{Topic: rcl.Listener.EventStreamTopic, Key: sarama.StringEncoder(ksuid.New().String()), Value: sarama.ByteEncoder(eBytes)}
140+
_, _, err := rcl.Producer.SendMessage(message)
141+
if err != nil {
142+
rcl.Logger.Info().Str(rcl.Listener.EventStreamTopic, ev.Name).Msgf("error sending event to stream: %v", err)
143+
}
144+
145+
}
146+
}
147+
148+
return nil
149+
}

kafka_service.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ import (
44
"github.com/Shopify/sarama"
55
)
66

7+
type Event struct {
8+
Contract string
9+
Sig string
10+
Arguments map[string]any
11+
}
12+
713
func startKafkaStream(s Settings) (sarama.Client, error) {
814

915
config := sarama.NewConfig()

main.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func main() {
2020
if err != nil {
2121
log.Fatal(err)
2222
}
23+
listener.CompileRegistryMap("config.yaml")
2324

2425
kafkaClient, err := startKafkaStream(settings)
2526
if err != nil {
@@ -32,11 +33,8 @@ func main() {
3233
log.Fatal(err)
3334
}
3435

35-
rewardClient, err := NewRewardContractListener(listener, producer, &logger)
36-
randomContract, err := NewRandomContractListener(listener, producer, &logger)
36+
issuanceClient, err := NewIssuanceContractListener(listener, producer, &logger)
3737

38-
go rewardClient.RewardContractListener()
39-
// random contract here bc its more active than ours and is easier to test with
40-
randomContract.RandomContractListener()
38+
issuanceClient.IssuanceContractIndexer()
4139

4240
}

0 commit comments

Comments
 (0)