Skip to content

Commit 956a031

Browse files
committed
chore: update to common redis + top markers account flag
1 parent cd26f39 commit 956a031

13 files changed

+1700
-317
lines changed

.gitmodules

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[submodule "drift-common"]
2+
path = drift-common
3+
url = https://github.com/drift-labs/drift-common

Dockerfile

+8
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,15 @@ RUN npm install -g yarn
55
RUN npm install -g typescript
66

77
WORKDIR /app
8+
COPY drift-common /app/drift-common
89
COPY . .
10+
WORKDIR /app/drift-common/protocol/sdk
11+
RUN yarn
12+
RUN yarn build
13+
WORKDIR /app/drift-common/common-ts
14+
RUN yarn
15+
RUN yarn build
16+
WORKDIR /app
917
RUN yarn
1018
RUN yarn build
1119

package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
"license": "Apache-2.0",
77
"dependencies": {
88
"@coral-xyz/anchor": "^0.29.0",
9-
"@drift-labs/sdk": "2.82.0-beta.16",
9+
"@drift/common": "file:./drift-common/common-ts",
10+
"@drift-labs/sdk": "file:./drift-common/protocol/sdk",
1011
"@opentelemetry/api": "^1.1.0",
1112
"@opentelemetry/auto-instrumentations-node": "^0.31.1",
1213
"@opentelemetry/exporter-prometheus": "^0.31.0",

src/dlob-subscriber/DLOBSubscriberIO.ts

+16-15
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import {
99
groupL2,
1010
isVariant,
1111
} from '@drift-labs/sdk';
12-
import { RedisClient } from '../utils/redisClient';
12+
import { RedisClient } from '@drift/common';
13+
1314
import {
1415
SubscriberLookup,
1516
addMarketSlotToResponse,
@@ -261,25 +262,25 @@ export class DLOBSubscriberIO extends DLOBSubscriber {
261262
asks: l2Formatted.asks.slice(0, 5),
262263
});
263264

264-
this.redisClient.client.publish(
265+
this.redisClient.publish(
265266
`orderbook_${marketType}_${marketArgs.marketIndex}`,
266-
JSON.stringify(l2Formatted)
267+
l2Formatted
267268
);
268-
this.redisClient.client.set(
269+
this.redisClient.set(
269270
`last_update_orderbook_${marketType}_${marketArgs.marketIndex}`,
270-
JSON.stringify(l2Formatted_depth100)
271+
l2Formatted_depth100
271272
);
272-
this.redisClient.client.set(
273+
this.redisClient.set(
273274
`last_update_orderbook_${marketType}_${marketArgs.marketIndex}_depth_100`,
274-
JSON.stringify(l2Formatted_depth100)
275+
l2Formatted_depth100
275276
);
276-
this.redisClient.client.set(
277+
this.redisClient.set(
277278
`last_update_orderbook_${marketType}_${marketArgs.marketIndex}_depth_20`,
278-
JSON.stringify(l2Formatted_depth20)
279+
l2Formatted_depth20
279280
);
280-
this.redisClient.client.set(
281+
this.redisClient.set(
281282
`last_update_orderbook_${marketType}_${marketArgs.marketIndex}_depth_5`,
282-
JSON.stringify(l2Formatted_depth5)
283+
l2Formatted_depth5
283284
);
284285

285286
const oraclePriceData =
@@ -306,9 +307,9 @@ export class DLOBSubscriberIO extends DLOBSubscriber {
306307
numMakers: 4,
307308
})
308309
.map((x) => x.toString());
309-
this.redisClient.client.set(
310+
this.redisClient.set(
310311
`last_update_orderbook_best_makers_${marketType}_${marketArgs.marketIndex}`,
311-
JSON.stringify({ bids, asks, slot })
312+
{ bids, asks, slot }
312313
);
313314
}
314315

@@ -361,9 +362,9 @@ export class DLOBSubscriberIO extends DLOBSubscriber {
361362
marketArgs.marketIndex
362363
);
363364

364-
this.redisClient.client.set(
365+
this.redisClient.set(
365366
`last_update_orderbook_l3_${marketType}_${marketArgs.marketIndex}`,
366-
JSON.stringify(l3)
367+
l3
367368
);
368369
}
369370
}

src/index.ts

+64-59
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
OrderSubscriber,
2020
MarketType,
2121
} from '@drift-labs/sdk';
22+
import { RedisClient, RedisClientPrefix } from '@drift/common';
2223

2324
import { logger, setLogLevel } from './utils/logger';
2425

@@ -42,43 +43,19 @@ import {
4243
normalizeBatchQueryParams,
4344
sleep,
4445
validateDlobQuery,
46+
getAccountFromId,
4547
} from './utils/utils';
4648
import FEATURE_FLAGS from './utils/featureFlags';
4749
import { getDLOBProviderFromOrderSubscriber } from './dlobProvider';
48-
import { RedisClient } from './utils/redisClient';
4950

5051
require('dotenv').config();
5152

5253
// Reading in Redis env vars
53-
const REDIS_HOSTS = process.env.REDIS_HOSTS?.replace(/^\[|\]$/g, '')
54+
const REDIS_CLIENTS = process.env.REDIS_CLIENTS?.replace(/^\[|\]$/g, '')
5455
.split(',')
55-
.map((host) => host.trim()) || ['localhost'];
56-
const REDIS_PORTS = process.env.REDIS_PORTS?.replace(/^\[|\]$/g, '')
57-
.split(',')
58-
.map((port) => parseInt(port.trim(), 10)) || [6379];
59-
const REDIS_PASSWORDS_ENV = process.env.REDIS_PASSWORDS || "['']";
60-
61-
let REDIS_PASSWORDS;
62-
63-
if (REDIS_PASSWORDS_ENV.trim() === "['']") {
64-
REDIS_PASSWORDS = [undefined];
65-
} else {
66-
REDIS_PASSWORDS = REDIS_PASSWORDS_ENV.replace(/^\[|\]$/g, '')
67-
.split(/\s*,\s*/)
68-
.map((pwd) => pwd.replace(/(^'|'$)/g, '').trim())
69-
.map((pwd) => (pwd === '' ? undefined : pwd));
70-
}
56+
.map((clients) => clients.trim()) || ['DLOB'];
7157

72-
console.log('Redis Hosts:', REDIS_HOSTS);
73-
console.log('Redis Ports:', REDIS_PORTS);
74-
console.log('Redis Passwords:', REDIS_PASSWORDS);
75-
76-
if (
77-
REDIS_PORTS.length !== REDIS_PASSWORDS.length ||
78-
REDIS_PORTS.length !== REDIS_HOSTS.length
79-
) {
80-
throw 'REDIS_HOSTS and REDIS_PASSWORDS and REDIS_PORTS must be the same length';
81-
}
58+
console.log('Redis Clients:', REDIS_CLIENTS);
8259

8360
const driftEnv = (process.env.ENV || 'devnet') as DriftEnv;
8461
const commitHash = process.env.COMMIT;
@@ -316,16 +293,11 @@ const main = async (): Promise<void> => {
316293
> = new Map();
317294
if (useRedis) {
318295
logger.info('Connecting to redis');
319-
for (let i = 0; i < REDIS_HOSTS.length; i++) {
320-
redisClients.push(
321-
new RedisClient(
322-
REDIS_HOSTS[i],
323-
REDIS_PORTS[i].toString(),
324-
REDIS_PASSWORDS[i] || undefined
325-
)
326-
);
327-
await redisClients[i].connect();
296+
for (let i = 0; i < REDIS_CLIENTS.length; i++) {
297+
const prefix = RedisClientPrefix[REDIS_CLIENTS[i]];
298+
redisClients.push(new RedisClient({ prefix }));
328299
}
300+
329301
for (let i = 0; i < sdkConfig.SPOT_MARKETS.length; i++) {
330302
spotMarketRedisMap.set(sdkConfig.SPOT_MARKETS[i].marketIndex, {
331303
client: redisClients[0],
@@ -344,6 +316,8 @@ const main = async (): Promise<void> => {
344316
}
345317
}
346318

319+
const userMapClient = new RedisClient({ prefix: RedisClientPrefix.USER_MAP });
320+
347321
function canRotate(marketType: MarketType, marketIndex: number) {
348322
if (isVariant(marketType, 'spot')) {
349323
const state = spotMarketRedisMap.get(marketIndex);
@@ -427,15 +401,21 @@ const main = async (): Promise<void> => {
427401
try {
428402
const { marketIndex, marketType } = req.query;
429403

430-
const fees = await redisClients[
431-
parseInt(process.env.HELIUS_REDIS_HOST_INDEX) ?? 0
432-
].client.get(`priorityFees_${marketType}_${marketIndex}`);
404+
const fees = await redisClients
405+
.find(
406+
(client) =>
407+
client.forceGetClient().options.keyPrefix ===
408+
RedisClientPrefix.DLOB_HELIUS
409+
)
410+
.getRaw(`priorityFees_${marketType}_${marketIndex}`);
411+
433412
if (fees) {
434413
res.status(200).json({
435414
...JSON.parse(fees),
436415
marketType,
437416
marketIndex,
438417
});
418+
439419
return;
440420
} else {
441421
res.writeHead(404);
@@ -466,11 +446,16 @@ const main = async (): Promise<void> => {
466446

467447
const fees = await Promise.all(
468448
normedParams.map(async (normedParam) => {
469-
const fees = await redisClients[
470-
parseInt(process.env.HELIUS_REDIS_HOST_INDEX) ?? 0
471-
].client.get(
472-
`priorityFees_${normedParam['marketType']}_${normedParam['marketIndex']}`
473-
);
449+
const fees = await redisClients
450+
.find(
451+
(client) =>
452+
client.forceGetClient().options.keyPrefix ===
453+
RedisClientPrefix.DLOB_HELIUS
454+
)
455+
.getRaw(
456+
`priorityFees_${normedParam['marketType']}_${normedParam['marketIndex']}`
457+
);
458+
474459
return {
475460
...JSON.parse(fees),
476461
marketType: normedParam['marketType'],
@@ -499,6 +484,7 @@ const main = async (): Promise<void> => {
499484
marketType,
500485
side, // bid or ask
501486
limit,
487+
includeAccounts,
502488
} = req.query;
503489

504490
const { normedMarketType, normedMarketIndex, error } = validateDlobQuery(
@@ -533,12 +519,17 @@ const main = async (): Promise<void> => {
533519
normedLimit = 4;
534520
}
535521

522+
let accountFlag = false;
523+
if (includeAccounts) {
524+
accountFlag = includeAccounts === 'true';
525+
}
526+
536527
let topMakers: string[];
537528
if (useRedis) {
538529
const redisClient = isVariant(normedMarketType, 'perp')
539530
? perpMarketRedisMap.get(normedMarketIndex).client
540531
: spotMarketRedisMap.get(normedMarketIndex).client;
541-
const redisResponse = await redisClient.client.get(
532+
const redisResponse = await redisClient.getRaw(
542533
`last_update_orderbook_best_makers_${getVariant(
543534
normedMarketType
544535
)}_${marketIndex}`
@@ -565,6 +556,13 @@ const main = async (): Promise<void> => {
565556
path: req.baseUrl + req.path,
566557
});
567558
res.writeHead(200);
559+
560+
if (accountFlag) {
561+
const topAccounts = await getAccountFromId(userMapClient, topMakers);
562+
res.end(JSON.stringify(topAccounts));
563+
return;
564+
}
565+
568566
res.end(JSON.stringify(topMakers));
569567
return;
570568
}
@@ -619,6 +617,13 @@ const main = async (): Promise<void> => {
619617
path: req.baseUrl + req.path,
620618
});
621619
res.writeHead(200);
620+
621+
if (accountFlag) {
622+
const topAccounts = await getAccountFromId(userMapClient, topMakers);
623+
res.end(JSON.stringify(topAccounts));
624+
return;
625+
}
626+
622627
res.end(JSON.stringify(topMakers));
623628
} catch (err) {
624629
next(err);
@@ -661,15 +666,15 @@ const main = async (): Promise<void> => {
661666
let redisL2: string;
662667
const redisClient = perpMarketRedisMap.get(normedMarketIndex).client;
663668
if (parseInt(adjustedDepth as string) === 5) {
664-
redisL2 = await redisClient.client.get(
669+
redisL2 = await redisClient.getRaw(
665670
`last_update_orderbook_perp_${normedMarketIndex}_depth_5`
666671
);
667672
} else if (parseInt(adjustedDepth as string) === 20) {
668-
redisL2 = await redisClient.client.get(
673+
redisL2 = await redisClient.getRaw(
669674
`last_update_orderbook_perp_${normedMarketIndex}_depth_20`
670675
);
671676
} else if (parseInt(adjustedDepth as string) === 100) {
672-
redisL2 = await redisClient.client.get(
677+
redisL2 = await redisClient.getRaw(
673678
`last_update_orderbook_perp_${normedMarketIndex}_depth_100`
674679
);
675680
}
@@ -694,15 +699,15 @@ const main = async (): Promise<void> => {
694699
let redisL2: string;
695700
const redisClient = spotMarketRedisMap.get(normedMarketIndex).client;
696701
if (parseInt(adjustedDepth as string) === 5) {
697-
redisL2 = await redisClient.client.get(
702+
redisL2 = await redisClient.getRaw(
698703
`last_update_orderbook_spot_${normedMarketIndex}_depth_5`
699704
);
700705
} else if (parseInt(adjustedDepth as string) === 20) {
701-
redisL2 = await redisClient.client.get(
706+
redisL2 = await redisClient.getRaw(
702707
`last_update_orderbook_spot_${normedMarketIndex}_depth_20`
703708
);
704709
} else if (parseInt(adjustedDepth as string) === 100) {
705-
redisL2 = await redisClient.client.get(
710+
redisL2 = await redisClient.getRaw(
706711
`last_update_orderbook_spot_${normedMarketIndex}_depth_100`
707712
);
708713
}
@@ -834,15 +839,15 @@ const main = async (): Promise<void> => {
834839
const redisClient =
835840
perpMarketRedisMap.get(normedMarketIndex).client;
836841
if (parseInt(adjustedDepth as string) === 5) {
837-
redisL2 = await redisClient.client.get(
842+
redisL2 = await redisClient.getRaw(
838843
`last_update_orderbook_perp_${normedMarketIndex}_depth_5`
839844
);
840845
} else if (parseInt(adjustedDepth as string) === 20) {
841-
redisL2 = await redisClient.client.get(
846+
redisL2 = await redisClient.getRaw(
842847
`last_update_orderbook_perp_${normedMarketIndex}_depth_20`
843848
);
844849
} else if (parseInt(adjustedDepth as string) === 100) {
845-
redisL2 = await redisClient.client.get(
850+
redisL2 = await redisClient.getRaw(
846851
`last_update_orderbook_perp_${normedMarketIndex}_depth_100`
847852
);
848853
}
@@ -870,15 +875,15 @@ const main = async (): Promise<void> => {
870875
const redisClient =
871876
spotMarketRedisMap.get(normedMarketIndex).client;
872877
if (parseInt(adjustedDepth as string) === 5) {
873-
redisL2 = await redisClient.client.get(
878+
redisL2 = await redisClient.getRaw(
874879
`last_update_orderbook_spot_${normedMarketIndex}_depth_5`
875880
);
876881
} else if (parseInt(adjustedDepth as string) === 20) {
877-
redisL2 = await redisClient.client.get(
882+
redisL2 = await redisClient.getRaw(
878883
`last_update_orderbook_spot_${normedMarketIndex}_depth_20`
879884
);
880885
} else if (parseInt(adjustedDepth as string) === 100) {
881-
redisL2 = await redisClient.client.get(
886+
redisL2 = await redisClient.getRaw(
882887
`last_update_orderbook_spot_${normedMarketIndex}_depth_100`
883888
);
884889
}
@@ -976,7 +981,7 @@ const main = async (): Promise<void> => {
976981
const redisClient = (
977982
marketTypeStr === 'spot' ? spotMarketRedisMap : perpMarketRedisMap
978983
).get(normedMarketIndex).client;
979-
const redisL3 = await redisClient.client.get(
984+
const redisL3 = await redisClient.getRaw(
980985
`last_update_orderbook_l3_${marketTypeStr}_${normedMarketIndex}`
981986
);
982987
if (

0 commit comments

Comments
 (0)