Skip to content

Commit

Permalink
Update rule to support http (#20)
Browse files Browse the repository at this point in the history
Co-authored-by: hanpengfei01 <[email protected]>
  • Loading branch information
hannatao and hanpengfei01 authored Feb 14, 2023
1 parent 2a0ce1c commit 9d28182
Show file tree
Hide file tree
Showing 18 changed files with 954 additions and 190 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v1
with:
go-version: 1.13
go-version: 1.18
- name: Checkout code
uses: actions/checkout@v1
- name: Run unittest
Expand All @@ -20,4 +20,4 @@ jobs:
uses: codecov/codecov-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.txt
file: ./coverage.txt
2 changes: 1 addition & 1 deletion .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v1
with:
go-version: 1.13
go-version: 1.18
- name: Checkout code
uses: actions/checkout@v1
- name: Build
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$TARGETPLATFORM golang:1.13.5-stretch as devel
FROM --platform=$TARGETPLATFORM golang:1.18.3-stretch as devel
ARG BUILD_ARGS
COPY / /go/src/
RUN cd /go/src/ && make build-local BUILD_ARGS=$BUILD_ARGS
Expand Down
106 changes: 85 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ baetyl-rule
Baetyl-Rule 可以实现 Baetyl 框架端侧的消息流转,在 [Baetyl-Broker](https://github.com/baetyl/baetyl-broker)(端侧消息中心)、函数服务、Iot Hub(云端 Mqtt Broker) 进行消息交换。

支持以下的消息流转方式:
- 订阅 Baetyl-Broker 的消息主题,发送到自身的其他消息主题,支持函数处理
- 订阅 Baetyl-Broker 的消息主题,发送到其他消息节点(比如Iot Hub)的消息主题,支持函数处理
- 订阅其他消息节点的消息主题,发送到 Baetyl-Broker 的消息主题,支持函数处理
- 订阅来自mqtt的消息,开放openapi,接受来自http的消息。
- 调用函数计算(此步骤可以省略)
- 将处理后的结果发送至mqtt或调用http服务

其中 Baetyl 支持 Python、Node、Sql 等多种运行时,可以配置相关的脚本函数对消息进行过滤、处理、转换以及丰富等,具体可以参考 [Baetyl-Function](https://github.com/baetyl/baetyl-function) 模块。

Baetyl-Rule 的全量配置文件如下,并对配置字段做了相应解释:

```yaml
clients: # 消息节点,可以从节点订阅消息,也可以发送消息节点
clients: # 消息节点,可以从消息节点订阅消息,也可以发送至消息节点
- name: iothub # 名称
kind: mqtt # mqtt 类型
address: 'ssl://u7isgiz.mqtt.iot.bj.baidubce.com:1884' # 地址
Expand All @@ -29,24 +29,88 @@ clients: # 消息节点,可以从节点订阅消息,也可以发送消息节
key: ../example/var/lib/baetyl/testcert/client.key # 连接节点的私钥
cert: ../example/var/lib/baetyl/testcert/client.pem # 连接节点的公钥
insecureSkipVerify: true # 是否跳过服务端证书校验
rules: # 消息规则
- name: rule1 # 规则名称,必须保持唯一
source: # 消息源
- name: http-client # 名称
kind: http # http类型(仅可配置为target)
address: 'http://127.0.0.1:8554' # http服务地址,如果是https,请配置证书,否则默认使用系统证书
ca: ../example/var/lib/baetyl/testcert/ca.pem # 连接节点的 CA
key: ../example/var/lib/baetyl/testcert/client.key # 连接节点的私钥
cert: ../example/var/lib/baetyl/testcert/client.pem # 连接节点的公钥
insecureSkipVerify: true # 是否跳过服务端证书校验
- name: http-server # 名称
kind: http-server # http-server类型(仅可配置为source)
port: 8090 # http server服务端口,下面的tls可选,不配置时,默认为http服务
ca: ../example/var/lib/baetyl/testcert/ca.pem # 服务器的 CA
key: ../example/var/lib/baetyl/testcert/client.key # 服务器的私钥
cert: ../example/var/lib/baetyl/testcert/client.pem # 服务器的公钥
insecureSkipVerify: true # 是否跳过服务端证书校验
rules: # 消息规则
- name: rule1 # 规则名称,必须保持唯一
source: # 消息源
topic: broker/topic1 # 消息主题
qos: 1 # 消息质量
target: # 消息目的地
client: iotcore # 消息节点,如果不设置,默认为 baetyl-broker
topic: iotcore/topic2 # 消息主题
qos: 0 # 消息质量
- name: rule2 # 规则名称,必须保持唯一
source: # 消息源
qos: 1 # 消息质量
target: # 消息目的地
client: iothub # 消息节点,如果不设置,默认为 baetyl-broker
topic: iothub/topic2 # 消息主题
qos: 0 # 消息质量
function: # 处理函数
name: node85 # 函数名称
- name: rule2 # 规则名称,必须保持唯一
source: # 消息源
topic: broker/topic5 # 消息主题
qos: 0 # 消息质量
target: # 消息目的地
topic: broker/topic6 # 消息主题
qos: 1 # 消息质量
function: # 处理函数
name: node85 # 函数名称
qos: 0 # 消息质量
target: # 消息目的地
client: http-client # 与clients中配置的http服务名称一致
path: /nodes/test # http访问路径
method: PUT # http消息类型,支持GET/POST/PUT/DELETE
- name: rule3 # 规则名称,必须保持唯一
source: # 消息源
client: http-server # 指定为http-server的消息源
target: # 消息目的地
client: http-client # 与clients中配置的http服务名称一致
path: /nodes/test # http访问路径
method: PUT # http消息类型,支持GET/POST/PUT/DELETE
```
其中,Baetyl-Rule 会自动在节点信息列表中添加 Baetyl-Broker 的节点信息。当一条规则的的 client 字段未配置时,会默认设置为 Baetyl-Broker,从 Baetyl-Broker 订阅消息或者转发消息到 Baetyl-Broker。
说明:
- baetyl-rule 后台默认添加边缘系统应用baetyl-broker作为一个消息节点
- 当一条rule规则的source/target 未配置 client 字段时,会默认使用 baetyl-broker 作为其消息节点
- 当一个client消息节点的kind为http 时,若address 连接地址使用https,默认使用baetyl-core签发的系统证书
- http类型消息节点只能作为一条rule的target,并且http请求的Content-Type为application/json
- http-server类型仅可作为rule的source,且该类型仅可存在一个配置,用户调用时,使用POST请求访问地址`http://{ip}:{port}/rules/{ruleName}` 来触发调用

## Demo示例

### 消息流转+函数计算

下面示例定义了三条规则:

- rule1:订阅broker/topic1消息,将消息作为函数`py-demo1/func1`的输入,将函数计算结果输出至`broker/topic2`
- rule2:订阅broker/topic3消息,将消息作为函数`py-demo1/func2`的输入,将函数计算结果通过http POST请求发送至`http://127.0.0.1:8554/rule/result`

```yaml
clients:
- name: http-server
kind: http
address: 'http://127.0.0.1:8554'
rules:
- name: rule1
source:
topic: broker/topic1
target:
topic: broker/topic2
function:
name: py-demo1/func1
- name: rule2
source:
topic: broker/topic3
target:
client: http-server
path: /rule/result
method: POST
function:
name: py-demo1/func2
logger:
level: debug
encoding: console
```
5 changes: 4 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
)

type Client interface {
SendOrDrop(pkt mqtt.Packet) error
SendOrDrop(method string, pkt *mqtt.Publish) error
SendPubAck(pkt mqtt.Packet) error
Start(obs mqtt.Observer)
ResetClient(cfg *mqtt.ClientConfig)
SetReconnectCallback(callback mqtt.ReconnectCallback)
io.Closer
}
82 changes: 82 additions & 0 deletions client/http_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package client

import (
"bytes"
"crypto/tls"
"fmt"
"strings"

"github.com/baetyl/baetyl-go/v2/context"
"github.com/baetyl/baetyl-go/v2/errors"
"github.com/baetyl/baetyl-go/v2/http"
"github.com/baetyl/baetyl-go/v2/mqtt"
"github.com/baetyl/baetyl-go/v2/utils"
)

type HTTPClient struct {
cli *http.Client
address string
}

func NewHTTPClient(ctx context.Context, cfg *mqtt.ClientConfig) (Client, error) {
options := http.NewClientOptions()
options.Address = cfg.Address
if strings.HasPrefix(cfg.Address, "https") {
var tlsCfg *tls.Config
var err error
if cfg.CA == "" || cfg.Cert == "" || cfg.Key == "" {
cert := ctx.SystemConfig().Certificate
cert.InsecureSkipVerify = true
tlsCfg, err = utils.NewTLSConfigClient(cert)
} else {
tlsCfg, err = utils.NewTLSConfigClient(utils.Certificate{
CA: cfg.CA,
Cert: cfg.Cert,
Key: cfg.Key,
InsecureSkipVerify: cfg.InsecureSkipVerify,
})
}
if err != nil {
return nil, err
}
options.TLSConfig = tlsCfg
}
cli := http.NewClient(options)
return &HTTPClient{
cli: cli,
address: cfg.Address,
}, nil
}

func (h *HTTPClient) SendOrDrop(method string, pkt *mqtt.Publish) error {
header := map[string]string{"Content-Type": "application/json"}
res, err := h.cli.SendUrl(strings.ToUpper(method), fmt.Sprintf("%s%s", h.address, pkt.Message.Topic), bytes.NewReader(pkt.Message.Payload), header)
if err != nil {
return errors.Trace(err)
}
if res.StatusCode != 200 {
return errors.New(res.Status)
}
return nil
}

func (h *HTTPClient) SendPubAck(pkt mqtt.Packet) error {
return nil
}

func (h *HTTPClient) Start(obs mqtt.Observer) {
return
}

func (h *HTTPClient) ResetClient(cfg *mqtt.ClientConfig) {
return
}

func (h *HTTPClient) SetReconnectCallback(callback mqtt.ReconnectCallback) {
return
}

// Close closes client
func (h *HTTPClient) Close() error {
return nil
}
19 changes: 18 additions & 1 deletion client/mqtt_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,31 @@ func NewMqttClient(cfg *mqtt.ClientConfig) (Client, error) {
return source, nil
}

func (m *MqttClient) SendOrDrop(pkt mqtt.Packet) error {
func (m *MqttClient) SendOrDrop(method string, pkt *mqtt.Publish) error {
return m.cli.SendOrDrop(pkt)
}

func (m *MqttClient) SendPubAck(pkt mqtt.Packet) error {
return m.cli.SendOrDrop(pkt)
}

func (m *MqttClient) Start(obs mqtt.Observer) {
m.cli.Start(obs)
}

func (m *MqttClient) ResetClient(cfg *mqtt.ClientConfig) {
ops := &mqtt.ClientOptions{
ClientID: cfg.ClientID,
Username: cfg.Username,
Password: cfg.Password,
}
m.cli.ResetClient(ops)
}

func (m *MqttClient) SetReconnectCallback(callback mqtt.ReconnectCallback) {
m.cli.SetReconnectCallback(callback)
}

func (m *MqttClient) Close() error {
if m.cli != nil {
return m.cli.Close()
Expand Down
8 changes: 2 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,11 @@ func main() {
return err
}

rulers, err := rule.NewRulers(cfg, function)
rulers, err := rule.NewRulers(ctx, cfg, function)
defer rulers.Close()
if err != nil {
return err
}
defer func() {
for _, ruler := range rulers {
ruler.Close()
}
}()

ctx.Wait()
return nil
Expand Down
50 changes: 48 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,58 @@
module github.com/baetyl/baetyl-rule/v2

go 1.13
go 1.18

require (
github.com/256dpi/gomqtt v0.14.3
github.com/baetyl/baetyl-broker/v2 v2.0.1-rc3
github.com/baetyl/baetyl-go/v2 v2.2.4-0.20220114042103-4ba035e5dfb7
github.com/baetyl/baetyl-go/v2 v2.2.4-0.20220507105142-99d4addbb3bf
github.com/qiangxue/fasthttp-routing v0.0.0-20160225050629-6ccdc2a18d87
github.com/stretchr/testify v1.6.1
github.com/valyala/fasthttp v1.9.0
)

require (
github.com/256dpi/mercury v0.2.0 // indirect
github.com/containerd/containerd v1.3.4 // indirect
github.com/creasty/defaults v1.4.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/dsnet/compress v0.0.1 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/jinzhu/copier v0.1.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.8.2 // indirect
github.com/klauspost/cpuid v1.2.1 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/mholt/archiver v3.1.1+incompatible // indirect
github.com/nwaples/rardecode v1.1.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/ulikunitz/xz v0.5.7 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
go.etcd.io/bbolt v1.3.4 // indirect
go.uber.org/atomic v1.6.0 // indirect
go.uber.org/multierr v1.5.0 // indirect
go.uber.org/zap v1.16.0 // indirect
golang.org/x/net v0.0.0-20200625001655-4c5254603344 // indirect
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd // indirect
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 // indirect
google.golang.org/grpc v1.25.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
gopkg.in/validator.v2 v2.0.0-20191107172027-c3144fdedc21 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
k8s.io/apimachinery v0.0.0-20190817020851-f2f3a405f61d // indirect
k8s.io/klog v0.3.1 // indirect
)
Loading

0 comments on commit 9d28182

Please sign in to comment.