Skip to content

Commit bab3e15

Browse files
committed
Add reverse plugin.
1 parent 45df46c commit bab3e15

File tree

4 files changed

+793
-1
lines changed

4 files changed

+793
-1
lines changed

rpc/plugins/reverse/caller.go

+338
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
/*--------------------------------------------------------*\
2+
| |
3+
| hprose |
4+
| |
5+
| Official WebSite: https://hprose.com |
6+
| |
7+
| rpc/plugins/reverse/caller.go |
8+
| |
9+
| LastModified: May 19, 2021 |
10+
| Author: Ma Bingyao <[email protected]> |
11+
| |
12+
\*________________________________________________________*/
13+
14+
package reverse
15+
16+
import (
17+
"context"
18+
"errors"
19+
"reflect"
20+
"sync"
21+
"sync/atomic"
22+
"time"
23+
24+
"github.com/hprose/hprose-golang/v3/io"
25+
"github.com/hprose/hprose-golang/v3/rpc/core"
26+
"github.com/modern-go/reflect2"
27+
cmap "github.com/orcaman/concurrent-map"
28+
)
29+
30+
type call [3]interface{}
31+
32+
func newCall(index int, name string, args []interface{}) (c call) {
33+
c[0] = index
34+
c[1] = name
35+
c[2] = args
36+
return
37+
}
38+
39+
func (c call) Value() (index int, name string, args []interface{}) {
40+
return c[0].(int), c[1].(string), c[2].([]interface{})
41+
}
42+
43+
type callCache struct {
44+
c []call
45+
sync.Mutex
46+
}
47+
48+
func (cc *callCache) Append(c call) {
49+
cc.Lock()
50+
defer cc.Unlock()
51+
cc.c = append(cc.c, c)
52+
}
53+
54+
func (cc *callCache) Delete(index int) {
55+
cc.Lock()
56+
defer cc.Unlock()
57+
for i := 0; i < len(cc.c); i++ {
58+
if cc.c[i][0].(int) == index {
59+
cc.c = append(cc.c[:i], cc.c[i+1:]...)
60+
return
61+
}
62+
}
63+
}
64+
65+
func (cc *callCache) Take() (calls []call) {
66+
cc.Lock()
67+
defer cc.Unlock()
68+
calls = cc.c
69+
cc.c = nil
70+
return
71+
}
72+
73+
type returnValue [3]interface{}
74+
75+
func newReturnValue(index int, result interface{}, err string) (r returnValue) {
76+
r[0] = index
77+
r[1] = result
78+
r[2] = err
79+
return
80+
}
81+
82+
func (r returnValue) Index() int {
83+
return r[0].(int)
84+
}
85+
86+
func (r returnValue) Value(returnType []reflect.Type) ([]interface{}, error) {
87+
err := r[2].(string)
88+
if err != "" {
89+
return nil, errors.New(err)
90+
}
91+
n := len(returnType)
92+
switch n {
93+
case 0:
94+
return nil, nil
95+
case 1:
96+
if result, err := io.Convert(r[1], returnType[0]); err != nil {
97+
return nil, err
98+
} else {
99+
return []interface{}{result}, nil
100+
}
101+
default:
102+
results := make([]interface{}, n)
103+
values := r[1].([]interface{})
104+
count := len(values)
105+
for i := 0; i < n && i < count; i++ {
106+
if result, err := io.Convert(values[i], returnType[i]); err != nil {
107+
return nil, err
108+
} else {
109+
results[i] = result
110+
}
111+
}
112+
for i := count; i < n; i++ {
113+
t := reflect2.Type2(returnType[i])
114+
results[i] = t.Indirect(t.New())
115+
}
116+
return results, nil
117+
}
118+
}
119+
120+
type resultMap struct {
121+
results map[int]chan returnValue
122+
sync.Mutex
123+
}
124+
125+
func newResultMap() *resultMap {
126+
return &resultMap{
127+
results: make(map[int]chan returnValue),
128+
}
129+
}
130+
131+
func (m *resultMap) GetAndDelete(index int) chan returnValue {
132+
m.Lock()
133+
defer m.Unlock()
134+
if result, ok := m.results[index]; ok {
135+
delete(m.results, index)
136+
return result
137+
}
138+
return nil
139+
}
140+
141+
func (m *resultMap) Delete(index int) {
142+
m.Lock()
143+
defer m.Unlock()
144+
delete(m.results, index)
145+
}
146+
147+
func (m *resultMap) Set(index int, result chan returnValue) {
148+
m.Lock()
149+
defer m.Unlock()
150+
m.results[index] = result
151+
}
152+
153+
var (
154+
emptyArgs = make([]interface{}, 0)
155+
emptyCall = make([]call, 0)
156+
)
157+
158+
type Caller struct {
159+
*core.Service
160+
HeartBeat time.Duration
161+
Timeout time.Duration
162+
calls cmap.ConcurrentMap
163+
results cmap.ConcurrentMap
164+
responders cmap.ConcurrentMap
165+
onlines cmap.ConcurrentMap
166+
counter int32
167+
}
168+
169+
func NewCaller(service *core.Service) *Caller {
170+
caller := &Caller{
171+
Service: service,
172+
HeartBeat: time.Minute * 2,
173+
Timeout: time.Second * 30,
174+
calls: cmap.New(),
175+
results: cmap.New(),
176+
responders: cmap.New(),
177+
onlines: cmap.New(),
178+
}
179+
service.Use(caller.handler).
180+
AddFunction(caller.close, "!!").
181+
AddFunction(caller.begin, "!").
182+
AddFunction(caller.end, "=")
183+
return caller
184+
}
185+
186+
func (c *Caller) ID(ctx context.Context) (id string) {
187+
if id = core.GetServiceContext(ctx).RequestHeaders().GetString("id"); id == "" {
188+
panic("client unique id not found")
189+
}
190+
return
191+
}
192+
193+
func (c *Caller) send(id string, responder chan []call) bool {
194+
if calls, ok := c.calls.Get(id); ok {
195+
calls := calls.(*callCache).Take()
196+
if len(calls) == 0 {
197+
return false
198+
}
199+
responder <- calls
200+
return true
201+
}
202+
return false
203+
}
204+
205+
func (c *Caller) response(id string) {
206+
if responder, ok := c.responders.Pop(id); ok {
207+
responder := responder.(chan []call)
208+
if !c.send(id, responder) {
209+
if c.responders.SetIfAbsent(id, responder) {
210+
responder <- nil
211+
}
212+
}
213+
}
214+
}
215+
216+
func (c *Caller) stop(ctx context.Context) string {
217+
id := c.ID(ctx)
218+
if responder, ok := c.responders.Pop(id); ok {
219+
responder.(chan []call) <- nil
220+
}
221+
return id
222+
}
223+
224+
func (c *Caller) close(ctx context.Context) {
225+
id := c.stop(ctx)
226+
c.onlines.Remove(id)
227+
}
228+
229+
func (c *Caller) begin(ctx context.Context) []call {
230+
id := c.stop(ctx)
231+
c.onlines.Set(id, true)
232+
responder := make(chan []call, 1)
233+
if !c.send(id, responder) {
234+
c.responders.Upsert(id, responder, func(exist bool, valueInMap interface{}, newValue interface{}) interface{} {
235+
if exist {
236+
valueInMap.(chan []call) <- nil
237+
}
238+
return newValue
239+
})
240+
if c.HeartBeat > 0 {
241+
ctx, cancel := context.WithTimeout(ctx, c.HeartBeat)
242+
defer cancel()
243+
select {
244+
case <-ctx.Done():
245+
responder <- emptyCall
246+
case result := <-responder:
247+
return result
248+
}
249+
}
250+
}
251+
return <-responder
252+
}
253+
254+
func (c *Caller) end(ctx context.Context, results []returnValue) {
255+
id := c.ID(ctx)
256+
for _, rv := range results {
257+
if r, ok := c.results.Get(id); ok {
258+
if value := r.(*resultMap).GetAndDelete(rv.Index()); value != nil {
259+
value <- rv
260+
}
261+
}
262+
}
263+
}
264+
265+
func (c *Caller) Invoke(id string, name string, args []interface{}, returnType ...reflect.Type) ([]interface{}, error) {
266+
return c.InvokeContext(context.Background(), id, name, args, returnType...)
267+
}
268+
269+
func (c *Caller) InvokeContext(ctx context.Context, id string, name string, args []interface{}, returnType ...reflect.Type) ([]interface{}, error) {
270+
if args == nil {
271+
args = emptyArgs
272+
}
273+
index := int(atomic.AddInt32(&c.counter, 1) & 0x7fffffff)
274+
var calls *callCache
275+
if cc, ok := c.calls.Get(id); ok {
276+
calls = cc.(*callCache)
277+
} else {
278+
calls = new(callCache)
279+
if !c.calls.SetIfAbsent(id, calls) {
280+
cc, _ := c.calls.Get(id)
281+
calls = cc.(*callCache)
282+
}
283+
}
284+
calls.Append(newCall(index, name, args))
285+
var results *resultMap
286+
if rm, ok := c.results.Get(id); ok {
287+
results = rm.(*resultMap)
288+
} else {
289+
results = newResultMap()
290+
if !c.results.SetIfAbsent(id, results) {
291+
rm, _ := c.results.Get(id)
292+
results = rm.(*resultMap)
293+
}
294+
}
295+
result := make(chan returnValue, 1)
296+
results.Set(index, result)
297+
c.response(id)
298+
if c.Timeout > 0 {
299+
ctx, cancel := context.WithTimeout(ctx, c.HeartBeat)
300+
defer cancel()
301+
select {
302+
case <-ctx.Done():
303+
calls.Delete(index)
304+
results.Delete(index)
305+
return nil, core.ErrTimeout
306+
case result := <-result:
307+
return result.Value(returnType)
308+
}
309+
}
310+
return (<-result).Value(returnType)
311+
}
312+
313+
func (c *Caller) UseService(remoteService interface{}, id string, namespace ...string) {
314+
ns := ""
315+
if len(namespace) > 0 {
316+
ns = namespace[0]
317+
}
318+
core.Proxy.Build(remoteService, invocation{caller: c, id: id, namespace: ns}.Invoke)
319+
}
320+
321+
func (c *Caller) Exists(id string) bool {
322+
return c.onlines.Has(id)
323+
}
324+
325+
func (c *Caller) IdList() []string {
326+
return c.onlines.Keys()
327+
}
328+
329+
func (c *Caller) handler(ctx context.Context, name string, args []interface{}, next core.NextInvokeHandler) (result []interface{}, err error) {
330+
core.GetServiceContext(ctx).Items().Set("caller", c)
331+
return next(ctx, name, args)
332+
}
333+
334+
func UseService(ctx context.Context, remoteService interface{}, namespace ...string) *Caller {
335+
caller := core.GetServiceContext(ctx).Items().GetInterface("caller").(*Caller)
336+
caller.UseService(remoteService, caller.ID(ctx), namespace...)
337+
return caller
338+
}

0 commit comments

Comments
 (0)