-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient.go
275 lines (233 loc) · 6.94 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package gitopia
import (
"context"
"encoding/hex"
"fmt"
"io"
"math"
"strings"
"time"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
jsonrpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/tx"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/authz"
"github.com/gitopia/gitopia-go/logger"
gtypes "github.com/gitopia/gitopia/v5/x/gitopia/types"
rtypes "github.com/gitopia/gitopia/v5/x/rewards/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
GITOPIA_ACC_ADDRESS_PREFIX = "gitopia"
GAS_ADJUSTMENT = 1.8
MAX_TRIES = 5
MAX_WAIT_BLOCKS = 10
TM_WS_ENDPOINT = "/websocket"
)
type Query struct {
Gitopia gtypes.QueryClient
Rewards rtypes.QueryClient
}
type Client struct {
cc client.Context
txf tx.Factory
rc *jsonrpcclient.Client
w *io.PipeWriter
Query
}
func NewClient(ctx context.Context, cc client.Context, txf tx.Factory) (Client, error) {
w := logger.FromContext(ctx).WriterLevel(logrus.DebugLevel)
cc = cc.WithOutput(w)
rc, err := jsonrpcclient.New(cc.NodeURI)
if err != nil {
return Client{}, errors.Wrap(err, "error creating rpc client")
}
q, err := GetQueryClient(GITOPIA_ADDR)
if err != nil {
return Client{}, errors.Wrap(err, "error creating query client")
}
return Client{
cc: cc,
txf: txf,
rc: rc,
w: w,
Query: q,
}, nil
}
func GetQueryClient(addr string) (Query, error) {
grpcConn, err := grpc.Dial(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.NewProtoCodec(nil).GRPCCodec())),
)
if err != nil {
return Query{}, errors.Wrap(err, "error connecting to gitopia")
}
gqc := gtypes.NewQueryClient(grpcConn)
rqc := rtypes.NewQueryClient(grpcConn)
return Query{gqc, rqc}, nil
}
// implement io.Closer
func (c Client) Close() error {
return c.w.Close()
}
func (c Client) QueryClient() *Query {
return &c.Query
}
func (c Client) Address() sdk.AccAddress {
return c.cc.FromAddress
}
func (c Client) AuthorizedBroadcastTx(ctx context.Context, msg sdk.Msg) error {
execMsg := authz.NewMsgExec(c.cc.FromAddress, []sdk.Msg{msg})
// !!HACK!! set sequence to 0 to force refresh account sequence for every txn
txHash, err := BroadcastTx(c.cc, c.txf.WithSequence(0).WithFeePayer(c.Address()), &execMsg)
if err != nil {
return err
}
_, err = c.waitForTx(ctx, txHash)
if err != nil {
return errors.Wrap(err, "error waiting for tx"+txHash)
}
return nil
}
func (c Client) BroadcastTxAndWait(ctx context.Context, msg ...sdk.Msg) error {
// !!HACK!! set sequence to 0 to force refresh account sequence for every txn
txHash, err := BroadcastTx(c.cc, c.txf.WithSequence(0).WithFeePayer(c.Address()), msg...)
if err != nil {
return err
}
_, err = c.waitForTx(ctx, txHash)
if err != nil {
return errors.Wrap(err, "error waiting for tx"+txHash)
}
return nil
}
// BroadcastTx attempts to generate, sign and broadcast a transaction with the
// given set of messages.
// It will return an error upon failure.
func BroadcastTx(clientCtx client.Context, txf tx.Factory, msgs ...sdk.Msg) (string, error) {
txf, err := txf.Prepare(clientCtx)
if err != nil {
return "", err
}
txf = txf.WithGasAdjustment(GAS_ADJUSTMENT)
_, adjusted, err := tx.CalculateGas(clientCtx, txf, msgs...)
if err != nil {
return "", err
}
fees, err := calculateFee(adjusted)
if err != nil {
return "", err
}
txf = txf.WithGas(adjusted).WithFees(fees.String())
txn, err := txf.BuildUnsignedTx(msgs...)
if err != nil {
return "", err
}
err = tx.Sign(txf, clientCtx.GetFromName(), txn, true)
if err != nil {
return "", err
}
txBytes, err := clientCtx.TxConfig.TxEncoder()(txn.GetTx())
if err != nil {
return "", err
}
// broadcast to a Tendermint node
res, err := clientCtx.BroadcastTx(txBytes)
if err != nil {
return "", err
}
return res.TxHash, nil
}
// Status returns the node Status
func (c Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) {
var result ctypes.ResultStatus
_, err := c.rc.Call(ctx, "status", map[string]interface{}{}, &result)
if err != nil {
return nil, err
}
return &result, nil
}
// latestBlockHeight returns the lastest block height of the app.
func (c Client) latestBlockHeight(ctx context.Context) (int64, error) {
resp, err := c.Status(ctx)
if err != nil {
return 0, err
}
return resp.SyncInfo.LatestBlockHeight, nil
}
// waitForNextBlock waits until next block is committed.
// It reads the current block height and then waits for another block to be
// committed, or returns an error if ctx is canceled.
func (c Client) waitForNextBlock(ctx context.Context) error {
return c.waitForNBlocks(ctx, 1)
}
// waitForNBlocks reads the current block height and then waits for anothers n
// blocks to be committed, or returns an error if ctx is canceled.
func (c Client) waitForNBlocks(ctx context.Context, n int64) error {
start, err := c.latestBlockHeight(ctx)
if err != nil {
return err
}
return c.waitForBlockHeight(ctx, start+n)
}
// waitForBlockHeight waits until block height h is committed, or returns an
// error if ctx is canceled.
func (c Client) waitForBlockHeight(ctx context.Context, h int64) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < MAX_TRIES; i++ {
latestHeight, err := c.latestBlockHeight(ctx)
if err != nil {
return err
}
if latestHeight >= h {
return nil
}
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context is cancelled")
case <-ticker.C:
}
}
return fmt.Errorf("timeout error")
}
// waitForTx requests the tx from hash, if not found, waits for next block and
// tries again. Returns an error if ctx is canceled.
func (c Client) waitForTx(ctx context.Context, hash string) (*ctypes.ResultTx, error) {
bz, err := hex.DecodeString(hash)
if err != nil {
return nil, errors.Wrapf(err, "unable to decode tx hash '%s'", hash)
}
for i := 0; i < MAX_WAIT_BLOCKS; i++ {
var result ctypes.ResultTx
_, err := c.rc.Call(ctx, "tx", map[string]interface{}{"hash": bz, "prove": false}, &result)
if err != nil {
if strings.Contains(err.Error(), "not found") {
// Tx not found, wait for next block and try again
err := c.waitForNextBlock(ctx)
if err != nil && !strings.Contains(err.Error(), "timeout") {
return nil, errors.Wrap(err, "waiting for next block")
}
continue
}
return nil, errors.Wrapf(err, "fetching tx '%s'", hash)
}
// Tx found
return &result, nil
}
return nil, fmt.Errorf("max block wait exceeded")
}
func calculateFee(gas uint64) (sdk.Coins, error) {
gasPrice, err := sdk.ParseDecCoin(GAS_PRICES)
if err != nil {
return nil, err
}
fee := float64(gas) * float64(gasPrice.Amount.MustFloat64())
fee = math.Ceil(fee)
return sdk.NewCoins(sdk.NewCoin(gasPrice.Denom, sdk.NewInt(int64(fee)))), nil
}