Skip to content

Commit 3165daa

Browse files
committed
implement generic worker pool
Add Job abstraction and define interface for workers. Implement concrete worker for blocks and transactions.
1 parent f7b3e8e commit 3165daa

File tree

8 files changed

+140
-102
lines changed

8 files changed

+140
-102
lines changed

export.go

+12-11
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import (
44
"bufio"
55
"fmt"
66
"os"
7-
"philenius/ethereum-transaction-export/models"
7+
"philenius/ethereum-transaction-export/work"
88
"time"
99

1010
"github.com/mgutz/logxi/v1"
1111
)
1212

13-
func exportAsCSV(txChan chan *models.Tx) {
13+
func exportAsCSV(jobs chan *work.Job) {
1414

1515
now := time.Now().Format("2006-01-02-15-04-05")
1616
f, err := os.Create(fmt.Sprintf("geth_tx_export_%s.csv", now))
@@ -24,13 +24,14 @@ func exportAsCSV(txChan chan *models.Tx) {
2424
w.Flush()
2525

2626
lineBuf := 0
27-
for transaction := range txChan {
28-
tx := transaction.Tx
27+
for job := range jobs {
28+
tx := job.Tx
2929

3030
lineBuf++
3131
line := fmt.Sprintf(
3232
"%s,%d,%s,%d,%d,%s,%s,%d,%d,%d,%s,%d\n",
33-
tx.Hash, tx.Nonce, tx.BlockHash, *tx.BlockNumber, *tx.TransactionIndex, tx.From, tx.To, tx.Value.Int64(), tx.Gas, tx.GasPrice.Int64(), tx.Input, transaction.Timestamp,
33+
tx.Hash, tx.Nonce, tx.BlockHash, *tx.BlockNumber, *tx.TransactionIndex, tx.From, tx.To,
34+
tx.Value.Int64(), tx.Gas, tx.GasPrice.Int64(), tx.Input, job.Timestamp,
3435
)
3536
w.WriteString(line)
3637

@@ -43,7 +44,7 @@ func exportAsCSV(txChan chan *models.Tx) {
4344
w.Flush()
4445
}
4546

46-
func exportFailedBlocks(blockChan chan int) {
47+
func exportFailedBlockJobs(jobs chan *work.Job) {
4748

4849
f, err := os.Create("failedBlocks.txt")
4950
if err != nil {
@@ -52,16 +53,16 @@ func exportFailedBlocks(blockChan chan int) {
5253

5354
w := bufio.NewWriter(f)
5455

55-
for block := range blockChan {
56-
w.WriteString(fmt.Sprintf("%d\n", block))
56+
for job := range jobs {
57+
w.WriteString(fmt.Sprintf("%d\n", job.BlockHeight))
5758
w.Flush()
5859
}
5960

6061
log.Info("finished writing to block error file")
6162
w.Flush()
6263
}
6364

64-
func exportFailedTx(txChan chan *models.TxHash) {
65+
func exportFailedTxJobs(jobs chan *work.Job) {
6566

6667
f, err := os.Create("failedTransactions.txt")
6768
if err != nil {
@@ -70,8 +71,8 @@ func exportFailedTx(txChan chan *models.TxHash) {
7071

7172
w := bufio.NewWriter(f)
7273

73-
for tx := range txChan {
74-
w.WriteString(fmt.Sprintf("%s\n", tx.Hash))
74+
for job := range jobs {
75+
w.WriteString(fmt.Sprintf("%s\n", job.TxHash))
7576
w.Flush()
7677
}
7778

main.go

+12-28
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/mgutz/logxi/v1"
1010
"github.com/onrik/ethrpc"
1111

12-
"philenius/ethereum-transaction-export/models"
1312
"philenius/ethereum-transaction-export/work"
1413
)
1514

@@ -21,6 +20,7 @@ func main() {
2120
port := flag.Int("port", 8545, "The port number of the Ethereum node.")
2221
startHeight := flag.Int("start", 0, "The height / number of the block where to start.")
2322
blockCount := flag.Int("count", 1000, "The total amount of blocks to fetch.")
23+
blockConcurr := flag.Int("blockConcurr", 10, "The count of concurrent workers for fetching block.")
2424
txConcurr := flag.Int("txConcurr", 10, "The count of concurrent workers for fetching transactions.")
2525
flag.Parse()
2626

@@ -32,11 +32,11 @@ func main() {
3232
}
3333
log.Info("successfully connected to Ethereum node", "host", *hostname, "port", *port, "version", version)
3434

35-
blockHeightChan := make(chan int, 10000)
36-
txHashChan := make(chan *models.TxHash, 10000)
37-
txChan := make(chan *models.Tx, 10000)
38-
failedBlockChan := make(chan int, 10000)
39-
failedTxChan := make(chan *models.TxHash, 10000)
35+
blockHeightChan := make(chan *work.Job, 10000)
36+
txHashChan := make(chan *work.Job, 10000)
37+
txChan := make(chan *work.Job, 10000)
38+
failedBlockChan := make(chan *work.Job, 10000)
39+
failedTxChan := make(chan *work.Job, 10000)
4040
latestBlock := 0
4141
latestTransactionCount := int64(0)
4242
wt := sync.WaitGroup{}
@@ -67,7 +67,7 @@ func main() {
6767
go func() {
6868
endHeight := *startHeight + *blockCount
6969
for i := *startHeight; i < endHeight; i++ {
70-
blockHeightChan <- i
70+
blockHeightChan <- &work.Job{BlockHeight: i}
7171
}
7272
log.Info("finished listing block numbers")
7373
close(blockHeightChan)
@@ -77,44 +77,28 @@ func main() {
7777

7878
// fetch all blocks
7979
go func() {
80-
for blockHeight := range blockHeightChan {
81-
latestBlock = blockHeight
82-
block, err := client.EthGetBlockByNumber(blockHeight, true)
83-
if err != nil {
84-
failedBlockChan <- blockHeight
85-
log.Error("failed to get block", "blockNumber", *blockCount, "err", err.Error())
86-
continue
87-
}
88-
if log.IsDebug() {
89-
log.Debug("successfully got block", "blockNumber", block.Number)
90-
}
91-
for _, tx := range block.Transactions {
92-
txHashChan <- &models.TxHash{block.Timestamp, tx.Hash}
93-
}
94-
}
95-
close(txHashChan)
96-
close(failedBlockChan)
97-
log.Info("finished fetching all blocks")
80+
p := work.NewBlockWorkerPool(*blockConcurr, clientAddr, blockHeightChan, txHashChan, failedBlockChan)
81+
p.Run()
9882
wt.Done()
9983
}()
10084
wt.Add(1)
10185

10286
// fetch all transactions
10387
go func() {
104-
p := work.NewPool(*txConcurr, clientAddr, txHashChan, txChan, failedTxChan)
88+
p := work.NewTxWorkerPool(*txConcurr, clientAddr, txHashChan, txChan, failedTxChan)
10589
p.Run()
10690
wt.Done()
10791
}()
10892
wt.Add(1)
10993

11094
go func() {
111-
exportFailedBlocks(failedBlockChan)
95+
exportFailedBlockJobs(failedBlockChan)
11296
wt.Done()
11397
}()
11498
wt.Add(1)
11599

116100
go func() {
117-
exportFailedTx(failedTxChan)
101+
exportFailedTxJobs(failedTxChan)
118102
wt.Done()
119103
}()
120104
wt.Add(1)

models/wrapperTypes.go

-15
This file was deleted.

work/blockWorker.go

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package work
2+
3+
import (
4+
"sync"
5+
6+
log "github.com/mgutz/logxi/v1"
7+
"github.com/onrik/ethrpc"
8+
)
9+
10+
type BlockWorker struct {
11+
client *ethrpc.EthRPC
12+
jobs chan *Job
13+
result chan *Job
14+
failedJobs chan *Job
15+
wt *sync.WaitGroup
16+
}
17+
18+
func (w *BlockWorker) doWork() {
19+
for job := range w.jobs {
20+
//latestBlock = blockHeight
21+
block, err := w.client.EthGetBlockByNumber(job.BlockHeight, true)
22+
if err != nil {
23+
w.failedJobs <- job
24+
log.Error("failed to get block", "blockNumber", job.BlockHeight, "err", err.Error())
25+
continue
26+
}
27+
if log.IsDebug() {
28+
log.Debug("successfully got block", "blockNumber", block.Number)
29+
}
30+
for _, tx := range block.Transactions {
31+
w.result <- &Job{TxHash: tx.Hash, Timestamp: block.Timestamp}
32+
}
33+
}
34+
log.Info("block worker finished")
35+
w.wt.Done()
36+
}

work/job.go

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package work
2+
3+
import (
4+
"github.com/onrik/ethrpc"
5+
)
6+
7+
type Job struct {
8+
BlockHeight int
9+
TxHash string
10+
Tx *ethrpc.Transaction
11+
Timestamp int
12+
}

work/txWorker.go

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package work
2+
3+
import (
4+
"sync"
5+
6+
log "github.com/mgutz/logxi/v1"
7+
"github.com/onrik/ethrpc"
8+
)
9+
10+
type TxWorker struct {
11+
client *ethrpc.EthRPC
12+
jobs chan *Job
13+
result chan *Job
14+
failedJobs chan *Job
15+
wt *sync.WaitGroup
16+
}
17+
18+
func (w *TxWorker) doWork() {
19+
for job := range w.jobs {
20+
//latestTransactionCount++
21+
transaction, err := w.client.EthGetTransactionByHash(job.TxHash)
22+
if err != nil || transaction.BlockNumber == nil || transaction.TransactionIndex == nil {
23+
w.failedJobs <- job
24+
log.Error("failed to get transaction", "txHash", job.TxHash)
25+
continue
26+
}
27+
28+
w.result <- &Job{
29+
Timestamp: job.Timestamp,
30+
Tx: transaction,
31+
}
32+
if log.IsDebug() {
33+
log.Debug("successfully got transaction", "txHash", transaction.Hash)
34+
}
35+
}
36+
log.Info("transaction worker finished")
37+
w.wt.Done()
38+
}

work/worker.go

+2-36
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,5 @@
11
package work
22

3-
import (
4-
"philenius/ethereum-transaction-export/models"
5-
"sync"
6-
7-
log "github.com/mgutz/logxi/v1"
8-
"github.com/onrik/ethrpc"
9-
)
10-
11-
type Worker struct {
12-
client *ethrpc.EthRPC
13-
jobs chan *models.TxHash
14-
result chan *models.Tx
15-
failedJobs chan *models.TxHash
16-
wt *sync.WaitGroup
17-
}
18-
19-
func (w *Worker) doWork() {
20-
for txHash := range w.jobs {
21-
//latestTransactionCount++
22-
transaction, err := w.client.EthGetTransactionByHash(txHash.Hash)
23-
if err != nil || transaction.BlockNumber == nil || transaction.TransactionIndex == nil {
24-
w.failedJobs <- txHash
25-
log.Error("failed to get transaction", "txHash", txHash.Hash)
26-
continue
27-
}
28-
29-
w.result <- &models.Tx{
30-
Timestamp: txHash.Timestamp,
31-
Tx: transaction,
32-
}
33-
if log.IsDebug() {
34-
log.Debug("successfully got transaction", "txHash", transaction.Hash)
35-
}
36-
}
37-
log.Info("finished fetching all transactions")
38-
w.wt.Done()
3+
type Worker interface {
4+
doWork()
395
}

work/workerPool.go

+28-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package work
22

33
import (
4-
"philenius/ethereum-transaction-export/models"
54
"sync"
65

76
log "github.com/mgutz/logxi/v1"
@@ -10,26 +9,43 @@ import (
109

1110
type Pool struct {
1211
wt *sync.WaitGroup
13-
worker []*Worker
14-
jobs chan *models.TxHash
15-
results chan *models.Tx
16-
failedJobs chan *models.TxHash
12+
workers []Worker
13+
results chan *Job
14+
failedJobs chan *Job
1715
}
1816

19-
// NewPool instantiates a pool of workers with given concurrency
20-
func NewPool(concurrency int, clientAddr string, jobs chan *models.TxHash, results chan *models.Tx, failedJobs chan *models.TxHash) *Pool {
17+
// NewTxWorkerPool instantiates a pool of workers with given concurrency for fetching transactions.
18+
func NewTxWorkerPool(concurrency int, clientAddr string, jobs chan *Job, results chan *Job, failedJobs chan *Job) *Pool {
2119

2220
wt := &sync.WaitGroup{}
2321

24-
workerArr := make([]*Worker, concurrency)
22+
workerArr := make([]Worker, concurrency)
2523
for i := 0; i < concurrency; i++ {
26-
workerArr[i] = &Worker{ethrpc.NewEthRPC(clientAddr), jobs, results, failedJobs, wt}
24+
workerArr[i] = &TxWorker{ethrpc.NewEthRPC(clientAddr), jobs, results, failedJobs, wt}
2725
}
2826

2927
return &Pool{
3028
wt: wt,
31-
worker: workerArr,
32-
jobs: jobs,
29+
workers: workerArr,
30+
results: results,
31+
failedJobs: failedJobs,
32+
}
33+
34+
}
35+
36+
// NewBlockWorkerPool instantiates a pool of workers with given concurrency for fetching blocks.
37+
func NewBlockWorkerPool(concurrency int, clientAddr string, jobs chan *Job, results chan *Job, failedJobs chan *Job) *Pool {
38+
39+
wt := &sync.WaitGroup{}
40+
41+
workerArr := make([]Worker, concurrency)
42+
for i := 0; i < concurrency; i++ {
43+
workerArr[i] = &BlockWorker{ethrpc.NewEthRPC(clientAddr), jobs, results, failedJobs, wt}
44+
}
45+
46+
return &Pool{
47+
wt: wt,
48+
workers: workerArr,
3349
results: results,
3450
failedJobs: failedJobs,
3551
}
@@ -38,7 +54,7 @@ func NewPool(concurrency int, clientAddr string, jobs chan *models.TxHash, resul
3854

3955
// Run starts the processing of incoming jobs
4056
func (p *Pool) Run() {
41-
for _, w := range p.worker {
57+
for _, w := range p.workers {
4258
go w.doWork()
4359
p.wt.Add(1)
4460
}

0 commit comments

Comments
 (0)