-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
54 lines (39 loc) · 1.28 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# Builtin
import time
import json
import sys
from pathlib import Path
from dataclasses import dataclass
# 3rd party
import yaml
from pydantic import BaseModel
# Custom
from datamodels import APIConfig
from duneservice import *
from notificationservice import NotificationService
def get_user_config(config_path: Path):
"""This should be the yaml file writter by the server code
named with the contract address"""
return yaml.safe_load(config_path.open(mode='r'))
def main():
assert len(sys.argv) == 2, "Incorrect number of arguments"
silent = False
f = open('api_keys.json')
api_config = APIConfig(**json.load(f))
f.close()
# Load user info and configure services
user_config = get_user_config(Path(sys.argv[1]))
dune_service = DuneService(api_config, user_config)
notify_service = NotificationService(api_config, user_config, silent=silent)
while True:
print("Running dune query")
response = dune_service.run_query_loop()
print(response)
if (response == "failed"):
continue
print(json.dumps(response, indent=2))
print("Sending notifications async")
notify_service.notify(response)
print("Query loop done, sleeping 10s")
time.sleep(10)
main()