-
Notifications
You must be signed in to change notification settings - Fork 208
/
Copy pathdataconverter.go
135 lines (119 loc) · 3.26 KB
/
dataconverter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package pso
import (
"bytes"
"encoding/json"
"fmt"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/sdk/converter"
)
// jsonDataConverter implements converter.DataConverter using JSON for Swarm and Particle
// WARGNING: Make sure all struct members are public (Capital letter) otherwise serialization does not work!
// TODO: consider storing blobs in external DB or S3
type jsonDataConverter struct {
}
// NewJSONDataConverter creates a json data converter
func NewJSONDataConverter() converter.DataConverter {
return &jsonDataConverter{}
}
// Json data converter implementation
func (dc *jsonDataConverter) ToPayloads(value ...interface{}) (*commonpb.Payloads, error) {
payloads := &commonpb.Payloads{}
for _, obj := range value {
payload, err := dc.ToPayload(obj)
if err != nil {
return nil, err
}
payloads.Payloads = append(payloads.Payloads, payload)
}
return payloads, nil
// TODO: store payloads in DB/S3 and return converter key
// return key, nil
}
func (dc *jsonDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) {
var err error
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
switch t := value.(type) {
case Swarm:
err = enc.Encode(*t.Settings)
if err == nil {
err = enc.Encode(*t.Gbest)
if err == nil {
if t.Settings.Size > 0 {
for _, particle := range t.Particles {
if particle == nil {
particle = new(Particle)
}
err = enc.Encode(*particle)
}
}
}
}
case WorkflowResult:
err = enc.Encode(t.Msg)
if err == nil {
err = enc.Encode(t.Success)
}
default:
err = enc.Encode(value)
}
if err != nil {
return nil, fmt.Errorf(
"unable to encode argument: %T, with error: %w", value, err)
}
payload := &commonpb.Payload{
Metadata: map[string][]byte{
"encoding": []byte("raw"),
},
Data: buf.Bytes(),
}
return payload, nil
}
func (dc *jsonDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
if payloads == nil {
return nil
}
// TODO: convert payloads into key in DB/S3 and retrieve actual payloads from DB/S3
for i, payload := range payloads.Payloads {
err := dc.FromPayload(payload, valuePtrs[i])
if err != nil {
return err
}
}
return nil
}
func (dc *jsonDataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error {
var err error
dec := json.NewDecoder(bytes.NewBuffer(payload.GetData()))
switch t := valuePtr.(type) {
case *Swarm:
t.Settings = new(SwarmSettings)
_ = dec.Decode(t.Settings)
t.Settings.function = FunctionFactory(t.Settings.FunctionName)
t.Gbest = NewPosition(t.Settings.function.dim)
err = dec.Decode(t.Gbest)
t.Particles = make([]*Particle, t.Settings.Size)
for index := 0; index < t.Settings.Size; index++ {
t.Particles[index] = new(Particle)
err = dec.Decode(t.Particles[index])
}
case *WorkflowResult:
err = dec.Decode(&t.Msg)
if err == nil {
err = dec.Decode(&t.Success)
}
default:
err = dec.Decode(valuePtr)
}
if err != nil {
return fmt.Errorf(
"unable to decode argument: %T, with error: %v", valuePtr, err)
}
return nil
}
func (dc *jsonDataConverter) ToString(_ *commonpb.Payload) string {
return "implement me"
}
func (dc *jsonDataConverter) ToStrings(_ *commonpb.Payloads) []string {
return []string{"implement me"}
}