-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
162 lines (137 loc) · 4.29 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"validators-health/internal/clients/clickhouse"
"validators-health/internal/handlers"
"validators-health/internal/migrations"
"validators-health/internal/notifier"
"validators-health/internal/scrapper"
"validators-health/internal/services"
)
var (
clickhouseService *services.ClickhouseService
cacheService *services.CacheService
)
func main() {
if err := migrations.CreateTables(); err != nil {
log.Fatalf("Error during table creation: %v (%s, %s, %s, %s)",
err,
os.Getenv("CLICKHOUSE_HOST"),
os.Getenv("CLICKHOUSE_DB"),
os.Getenv("CLICKHOUSE_USER"),
os.Getenv("CLICKHOUSE_PASSWORD"))
}
initServices()
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
var wg sync.WaitGroup
wg.Add(3)
stopChannel := make(chan struct{})
go runScrapper(&wg, stopChannel)
go runNotifier(&wg, stopChannel)
go runBackend(&wg, stopChannel)
<-stop
log.Println("Shutting down gracefully...")
close(stopChannel)
wg.Wait()
<-stop
log.Println("Shutting down gracefully...")
}
func initServices() {
redisClient := redis.NewClient(&redis.Options{
Addr: os.Getenv("REDIS_ADDR"),
Password: os.Getenv("REDIS_PASSWORD"),
})
cacheService = &services.CacheService{RedisClient: redisClient}
clickhouseClient, err := clickhouse.GetClickHouseClient()
if err != nil {
log.Fatalf(" ClickHouse : %v", err)
}
clickhouseService = &services.ClickhouseService{
DB: clickhouseClient.DB,
}
}
func runScrapper(wg *sync.WaitGroup, stop <-chan struct{}) {
defer wg.Done()
log.Println("Starting Scrapper...")
s, _ := scrapper.NewScrapper()
effThreshold, _ := strconv.Atoi(os.Getenv("EFFICIENCY_THRESHOLD"))
if os.Getenv("BATCH_SCRAPPING_START_CYCLE_ID") != "" && os.Getenv("BATCH_SCRAPPING_FINISH_CYCLE_ID") != "" {
startCycleId, _ := strconv.Atoi(os.Getenv("BATCH_SCRAPPING_START_CYCLE_ID"))
finishCycleId, _ := strconv.Atoi(os.Getenv("BATCH_SCRAPPING_FINISH_CYCLE_ID"))
log.Printf("Starting BATCH scrapping from %d to %d", startCycleId, finishCycleId)
if startCycleId > 0 && finishCycleId > startCycleId {
for cycleId := startCycleId; cycleId <= finishCycleId; cycleId += 65536 {
log.Println(fmt.Sprintf("Cycle id %d...", cycleId))
for fromTs := startCycleId; fromTs <= startCycleId+65535; fromTs += 600 {
toTs := fromTs + 60
log.Println(fmt.Sprintf("Cycle id %d: %d - %d..", cycleId, fromTs, toTs))
if err := s.ProcessCycles(stop, float64(effThreshold), &cycleId, fromTs, toTs, true); err != nil {
log.Fatalf(err.Error())
return
}
}
}
}
} else {
for {
if err := s.ProcessCycles(stop, float64(effThreshold), nil, int(time.Now().Add(-60).Unix()), int(time.Now().Unix()), false); err != nil {
log.Fatalf("Scrapper failed: %v", err)
return
}
}
}
log.Println("Scrapper finished successfully.")
}
func runNotifier(wg *sync.WaitGroup, stop <-chan struct{}) {
defer wg.Done()
log.Println("Starting Notifier...")
n, err := notifier.NewNotifier(clickhouseService, cacheService)
if err != nil {
log.Fatalf("Failed to initialize Notifier: %v", err)
}
n.ListenAndNotify(stop)
log.Println("Notifier finished successfully.")
}
func runBackend(wg *sync.WaitGroup, stop <-chan struct{}) {
defer wg.Done()
h := handlers.NewHandlers(clickhouseService, cacheService)
server := &http.Server{
Addr: ":3000",
Handler: nil,
}
http.Handle("/", http.FileServer(http.Dir("./static")))
http.HandleFunc("/api/chart", h.ChartHandler)
http.HandleFunc("/api/health", h.HealthHandler)
http.HandleFunc("/api/validator-statuses", h.ValidatorStatusesHandler)
serverErrChan := make(chan error, 1)
go func() {
log.Println("Backend started on :3000")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
serverErrChan <- err
}
}()
select {
case <-stop:
log.Println("Shutting down the backend server...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Fatalf("Backend server shutdown failed: %v", err)
} else {
log.Println("Backend server gracefully stopped")
}
case err := <-serverErrChan:
log.Fatalf("Backend server failed: %v", err)
}
}