-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathapplication.go
243 lines (225 loc) · 6.53 KB
/
application.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package appctl
import (
"context"
"errors"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
)
type (
// Resources is an abstraction representing application resources.
Resources interface {
// Init is executed before transferring control to MainFunc. Should initialize resources and check their
// minimum health. If an error is returned, MainFunc will not be started.
Init(context.Context) error
// Watch is executed in the background, monitors the state of resources.
// Exiting this procedure will immediately stop the application.
Watch(context.Context) error
// Stop signals the Watch procedure to terminate the work.
Stop()
// Release releases the resources. Executed just before exiting the Application.Run.
Release() error
}
Application struct {
// MainFunc will run as the main thread of execution when you execute the Run method.
// Termination of this function will result in the termination of Run, the error that was passed as a
// result will be thrown as a result of Run execution.
//
// The halt channel controls the runtime of the application, as soon as it closes, you need to gracefully
// complete all current tasks and exit the MainFunc.
MainFunc func(ctx context.Context, halt <-chan struct{}) error
// Resources is an abstraction that represents the resources needed to execute the main thread.
// The health of resources directly affects the main thread of execution.
Resources Resources
// TerminationTimeout limits the time for the main thread to terminate. On normal shutdown,
// if MainFunc does not return within the allotted time, the job will terminate with an ErrTermTimeout error.
TerminationTimeout time.Duration
// InitializationTimeout limits the time to initialize resources.
// If the resources are not initialized within the allotted time, the application will not be launched
InitializationTimeout time.Duration
appState int32
mux sync.Mutex
err error
halt chan struct{}
done chan struct{}
}
AppContext struct{}
)
const (
appStateInit int32 = iota
appStateRunning
appStateHalt
appStateShutdown
)
const (
defaultTerminationTimeout = time.Second
defaultInitializationTimeout = time.Second * 15
)
// Run starts the execution of the main application thread with the MainFunc function.
// Returns an error if the execution of the application ended abnormally, otherwise it will return a nil.
func (a *Application) Run() error {
if a.MainFunc == nil {
return ErrMainOmitted
}
if a.checkState(appStateInit, appStateRunning) {
if err := a.init(); err != nil {
a.err = err
a.appState = appStateShutdown
return err
}
var servicesRunning = make(chan struct{})
if a.Resources != nil {
go a.watchResources(servicesRunning)
}
sig := make(chan os.Signal, 5)
signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
// main thread execution
a.setError(a.run(sig))
// shutdown
if a.Resources != nil {
a.Resources.Stop()
select {
case <-servicesRunning:
case <-time.After(a.TerminationTimeout):
}
a.setError(a.Resources.Release())
}
return a.getError()
}
return ErrWrongState
}
func (a *Application) init() error {
if a.TerminationTimeout == 0 {
a.TerminationTimeout = defaultTerminationTimeout
}
if a.InitializationTimeout == 0 {
a.InitializationTimeout = defaultInitializationTimeout
}
a.halt = make(chan struct{})
a.done = make(chan struct{})
if a.Resources != nil {
ctx, cancel := context.WithTimeout(a, a.InitializationTimeout)
defer cancel()
return a.Resources.Init(ctx)
}
return nil
}
func (a *Application) watchResources(servicesRunning chan<- struct{}) {
defer close(servicesRunning)
// if the stop is due to the correct stop of services without any error,
// we still have to stop the application
defer a.Close()
a.setError(a.Resources.Watch(a))
}
func (a *Application) run(sig <-chan os.Signal) error {
defer a.Close()
var errRun = make(chan error, 1)
go func() {
defer close(errRun)
if err := a.MainFunc(a, a.halt); err != nil {
errRun <- err
}
}()
var errHld = make(chan error, 1)
go func() {
defer close(errHld)
select {
// wait for os signal
case <-sig:
a.Shutdown()
// In this mode, the main thread should stop accepting new requests, terminate all current requests, and exit.
// Exiting the procedure of the main thread will lead to an implicit call Close(),
// if this does not happen, we will make an explicit call through the shutdown timeout
select {
case <-time.After(a.TerminationTimeout):
errHld <- ErrTermTimeout
case <-a.done:
// ok
}
// if shutdown
case <-a.done:
// exit immediately
}
}()
select {
case err, ok := <-errRun:
if ok && err != nil {
return err
}
case err, ok := <-errHld:
if ok && err != nil {
return err
}
case <-a.done:
// shutdown
}
return nil
}
// Shutdown signals the application to terminate the current computational processes and prepare to stop the application.
func (a *Application) Shutdown() {
if a.checkState(appStateRunning, appStateHalt) {
close(a.halt)
}
}
// Close stops the application immediately. At this point, all calculations should be completed.
func (a *Application) Close() error {
a.Shutdown()
if a.checkState(appStateHalt, appStateShutdown) {
close(a.done)
}
return a.Err()
}
// Deadline returns the time when work done on behalf of this context should be canceled.
func (a *Application) Deadline() (deadline time.Time, ok bool) {
return time.Time{}, false
}
// Done returns a channel that's closed when work done on behalf of this context should be canceled.
func (a *Application) Done() <-chan struct{} {
return a.done
}
// Err returns error when application is closed.
// If Done is not yet closed, Err returns nil. If Done is closed, Err returns ErrShutdown.
func (a *Application) Err() error {
if err := a.getError(); err != nil {
return err
}
if atomic.LoadInt32(&a.appState) == appStateShutdown {
return ErrShutdown
}
return nil
}
// Value returns the Application object.
func (a *Application) Value(key interface{}) interface{} {
var appContext = AppContext{}
if key == appContext {
return a
}
return nil
}
func (a *Application) getError() error {
var err error
a.mux.Lock()
err = a.err
a.mux.Unlock()
return err
}
func (a *Application) setError(err error) {
if err == nil {
return
}
if errors.Is(err, ErrShutdown) {
return
}
a.mux.Lock()
if a.err == nil {
a.err = err
}
a.mux.Unlock()
a.Close()
}
func (a *Application) checkState(old, new int32) bool {
return atomic.CompareAndSwapInt32(&a.appState, old, new)
}