-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfunction.go
124 lines (106 loc) · 3.39 KB
/
function.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package cloudtasks
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"reflect"
"runtime"
"testing"
)
var (
// registry of all delayed functions
funcs = make(map[string]*Function)
// precomputed types
contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
errorType = reflect.TypeOf((*error)(nil)).Elem()
)
// TaskFn should be implemented by any task function.
type TaskFn func(ctx context.Context, task *Task) error
// Function is a stored task implementation.
type Function struct {
key string
fn TaskFn
err error
}
// Func builds and registers a new task implementation.
func Func(key string, fn TaskFn) *Function {
f := &Function{
key: key,
fn: fn,
}
if old := funcs[f.key]; old != nil {
_, file, _, _ := runtime.Caller(1)
old.err = fmt.Errorf("cloudtasks: multiple functions registered for %s in %s", key, file)
}
funcs[f.key] = f
return f
}
// Task builds a task invocation to the function. You can later send the task
// in batches using queue.SendTasks() or directly invoke Call() to make both things
// at the same time.
//
// The payload can be a proto.Message or any other kind of interface that can
// be serialized to JSON. It would then be read on the task.
func (f *Function) Task(payload interface{}, opts ...TaskOption) (*Task, error) {
if f.err != nil {
return nil, f.err
}
task := &Task{
key: f.key,
}
for _, opt := range opts {
opt(task)
}
var err error
task.payload, err = json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("cloudtasks: cannot marshal task payload %T: %w", payload, err)
}
// Cloud Tasks enforces a 100KiB limit on every task. We proactively alert
// before that limit to improve our error messages with the payload prefix.
if len(task.payload) > 95*1024 {
return nil, fmt.Errorf("cloudtasks: task has a payload longer than 95KiB: %d bytes: %v ...", len(task.payload), string(task.payload[:100]))
}
return task, nil
}
// Call builds a task invocation and directly sends it individually to the queue.
//
// If you are going to send multiple tasks at the same time is more efficient to
// build all of them with Task() first and then send them in batches with queue.SendTasks().
// If sending a single task this function will be similar in performance to the batch
// method described before.
func (f *Function) Call(ctx context.Context, queue Queue, payload interface{}, opts ...TaskOption) error {
_, err := f.CallTask(ctx, queue, payload, opts...)
return err
}
// CallTask builds a task invocation, sends it and returns the built task.
func (f *Function) CallTask(ctx context.Context, queue Queue, payload interface{}, opts ...TaskOption) (*Task, error) {
task, err := f.Task(payload, opts...)
if err != nil {
return nil, err
}
return task, queue.Send(ctx, task)
}
// TestCall makes a direct call to the handler with the payload as incoming payload.
// It requires a testing argument to be sure it is only used in tests.
func (f *Function) TestCall(t *testing.T, payload interface{}) error {
send, err := f.Task(payload)
if err != nil {
return err
}
task := &Task{
key: f.key,
name: generateRandomString(10),
payload: send.payload,
}
return f.fn(context.Background(), task)
}
func generateRandomString(n int) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}