@@ -4,11 +4,14 @@ import (
4
4
"context"
5
5
"errors"
6
6
"net/http"
7
+ "os"
7
8
8
9
cloudevents "github.com/cloudevents/sdk-go/v2"
9
10
"k8s.io/klog/v2"
10
11
11
12
ofctx "github.com/OpenFunction/functions-framework-go/context"
13
+ "github.com/OpenFunction/functions-framework-go/internal/functions"
14
+ "github.com/OpenFunction/functions-framework-go/internal/registry"
12
15
"github.com/OpenFunction/functions-framework-go/plugin"
13
16
plgExample "github.com/OpenFunction/functions-framework-go/plugin/plugin-example"
14
17
"github.com/OpenFunction/functions-framework-go/plugin/skywalking"
@@ -23,6 +26,7 @@ type functionsFrameworkImpl struct {
23
26
postPlugins []plugin.Plugin
24
27
pluginMap map [string ]plugin.Plugin
25
28
runtime runtime.Interface
29
+ registry * registry.Registry
26
30
}
27
31
28
32
// Framework is the interface for the function conversion.
@@ -36,6 +40,9 @@ type Framework interface {
36
40
func NewFramework () (* functionsFrameworkImpl , error ) {
37
41
fwk := & functionsFrameworkImpl {}
38
42
43
+ // Set the function registry
44
+ fwk .registry = registry .Default ()
45
+
39
46
// Parse OpenFunction FunctionContext
40
47
if ctx , err := ofctx .GetRuntimeContext (); err != nil {
41
48
klog .Errorf ("failed to parse OpenFunction FunctionContext: %v\n " , err )
@@ -59,17 +66,29 @@ func NewFramework() (*functionsFrameworkImpl, error) {
59
66
60
67
func (fwk * functionsFrameworkImpl ) Register (ctx context.Context , fn interface {}) error {
61
68
if fnHTTP , ok := fn .(func (http.ResponseWriter , * http.Request )); ok {
62
- if err := fwk .runtime .RegisterHTTPFunction (fwk .funcContext , fwk .prePlugins , fwk .postPlugins , fnHTTP ); err != nil {
69
+ rf , err := functions .New (functions .WithFunctionName (fwk .funcContext .GetName ()), functions .WithHTTP (fnHTTP ), functions .WithFunctionPath (fwk .funcContext .GetHttpPattern ()))
70
+ if err != nil {
71
+ klog .Errorf ("failed to register function: %v" , err )
72
+ }
73
+ if err := fwk .runtime .RegisterHTTPFunction (fwk .funcContext , fwk .prePlugins , fwk .postPlugins , rf ); err != nil {
63
74
klog .Errorf ("failed to register function: %v" , err )
64
75
return err
65
76
}
66
77
} else if fnOpenFunction , ok := fn .(func (ofctx.Context , []byte ) (ofctx.Out , error )); ok {
67
- if err := fwk .runtime .RegisterOpenFunction (fwk .funcContext , fwk .prePlugins , fwk .postPlugins , fnOpenFunction ); err != nil {
78
+ rf , err := functions .New (functions .WithFunctionName (fwk .funcContext .GetName ()), functions .WithOpenFunction (fnOpenFunction ), functions .WithFunctionPath (fwk .funcContext .GetHttpPattern ()))
79
+ if err != nil {
80
+ klog .Errorf ("failed to register function: %v" , err )
81
+ }
82
+ if err := fwk .runtime .RegisterOpenFunction (fwk .funcContext , fwk .prePlugins , fwk .postPlugins , rf ); err != nil {
68
83
klog .Errorf ("failed to register function: %v" , err )
69
84
return err
70
85
}
71
86
} else if fnCloudEvent , ok := fn .(func (context.Context , cloudevents.Event ) error ); ok {
72
- if err := fwk .runtime .RegisterCloudEventFunction (ctx , fwk .funcContext , fwk .prePlugins , fwk .postPlugins , fnCloudEvent ); err != nil {
87
+ rf , err := functions .New (functions .WithFunctionName (fwk .funcContext .GetName ()), functions .WithCloudEvent (fnCloudEvent ), functions .WithFunctionPath (fwk .funcContext .GetHttpPattern ()))
88
+ if err != nil {
89
+ klog .Errorf ("failed to register function: %v" , err )
90
+ }
91
+ if err := fwk .runtime .RegisterCloudEventFunction (ctx , fwk .funcContext , fwk .prePlugins , fwk .postPlugins , rf ); err != nil {
73
92
klog .Errorf ("failed to register function: %v" , err )
74
93
return err
75
94
}
@@ -82,6 +101,56 @@ func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{})
82
101
}
83
102
84
103
func (fwk * functionsFrameworkImpl ) Start (ctx context.Context ) error {
104
+
105
+ target := os .Getenv ("FUNCTION_TARGET" )
106
+
107
+ // if FUNCTION_TARGET is provided
108
+ if len (target ) > 0 {
109
+ if fn , ok := fwk .registry .GetRegisteredFunction (target ); ok {
110
+ klog .Infof ("registering function: %s on path: %s" , target , fn .GetPath ())
111
+ switch fn .GetFunctionType () {
112
+ case functions .HTTPType :
113
+ fwk .Register (ctx , fn .GetHTTPFunction ())
114
+ case functions .CloudEventType :
115
+ fwk .Register (ctx , fn .GetCloudEventFunction ())
116
+ case functions .OpenFunctionType :
117
+ fwk .Register (ctx , fn .GetOpenFunctionFunction ())
118
+ }
119
+ } else {
120
+ klog .Errorf ("function not found: %s" , target )
121
+ }
122
+ } else {
123
+ // if FUNCTION_TARGET is not provided but user uses declarative function, by default all registered functions will be deployed.
124
+ funcNames := fwk .registry .GetFunctionNames ()
125
+ if len (funcNames ) > 1 && fwk .funcContext .GetRuntime () == ofctx .Async {
126
+ return errors .New ("only one function is allowed in async runtime" )
127
+ } else if len (funcNames ) > 0 {
128
+ klog .Info ("no 'FUNCTION_TARGET' is provided, register all the functions in the registry" )
129
+ for _ , name := range funcNames {
130
+ if rf , ok := fwk .registry .GetRegisteredFunction (name ); ok {
131
+ klog .Infof ("registering function: %s on path: %s" , rf .GetName (), rf .GetPath ())
132
+ switch rf .GetFunctionType () {
133
+ case functions .HTTPType :
134
+ if err := fwk .runtime .RegisterHTTPFunction (fwk .funcContext , fwk .prePlugins , fwk .postPlugins , rf ); err != nil {
135
+ klog .Errorf ("failed to register function: %v" , err )
136
+ return err
137
+ }
138
+ case functions .CloudEventType :
139
+ if err := fwk .runtime .RegisterCloudEventFunction (ctx , fwk .funcContext , fwk .prePlugins , fwk .postPlugins , rf ); err != nil {
140
+ klog .Errorf ("failed to register function: %v" , err )
141
+ return err
142
+ }
143
+ case functions .OpenFunctionType :
144
+ if err := fwk .runtime .RegisterOpenFunction (fwk .funcContext , fwk .prePlugins , fwk .postPlugins , rf ); err != nil {
145
+ klog .Errorf ("failed to register function: %v" , err )
146
+ return err
147
+ }
148
+ }
149
+ }
150
+ }
151
+ }
152
+ }
153
+
85
154
err := fwk .runtime .Start (ctx )
86
155
if err != nil {
87
156
klog .Error ("failed to start runtime service" )
@@ -136,13 +205,12 @@ func createRuntime(fwk *functionsFrameworkImpl) error {
136
205
rt := fwk .funcContext .GetRuntime ()
137
206
port := fwk .funcContext .GetPort ()
138
207
pattern := fwk .funcContext .GetHttpPattern ()
139
-
140
208
switch rt {
141
209
case ofctx .Knative :
142
210
fwk .runtime = knative .NewKnativeRuntime (port , pattern )
143
211
return nil
144
212
case ofctx .Async :
145
- fwk .runtime , err = async .NewAsyncRuntime (port )
213
+ fwk .runtime , err = async .NewAsyncRuntime (port , pattern )
146
214
if err != nil {
147
215
return err
148
216
}
0 commit comments