diff --git a/README.md b/README.md index 399e212..90e746c 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,12 @@ and printed in the console ./grpcr --input-file-directory="/tmp/mycapture" --output-stdout --output-grpc="grpc://127.0.0.1:35002" ``` +Capture gRPC requests on "127.0.0.1:35001", +keep only requests whose method suffix is Time, and print them in the console +``` +./grpcr --input-raw="127.0.0.1:35001" --output-stdout --include-filter-method-match=".*Time$" +``` + ### The captured request looks like ``` { @@ -127,5 +133,5 @@ and [buger/goreplay](https://github.com/buger/goreplay) * [x] 9)Support for reading GRPC requests from files * [ ] 10)Support reading GRPC requests from kafka * [ ] 11)Support for reading GRPC requests from RocketMQ -* [ ] 12)Support custom filter +* [x] 12)Support custom filter * [ ] 13)support TLS diff --git a/README_zh.md b/README_zh.md index 2a33432..26da763 100644 --- a/README_zh.md +++ b/README_zh.md @@ -79,6 +79,11 @@ sudo -s ./grpcr --input-file-directory="/tmp/mycapture" --output-stdout --output-grpc="grpc://127.0.0.1:35002" ``` +捕获"127.0.0.1:35001"上的gRPC请求,只保留method后缀为Time的请求,并打印在控制台中 +``` +./grpcr --input-raw="127.0.0.1:35001" --output-stdout --include-filter-method-match=".*Time$" +``` + ### 捕获的请求形如 ``` { @@ -125,5 +130,5 @@ export SIMPLE_LOG_LEVEL=debug * [x] 9)支持从文件中读取GRPC请求 * [ ] 10)支持从kafka中读取GRPC请求 * [ ] 11)支持从RocketMQ中读取GRPC请求 -* [ ] 12)支持自定义filter +* [x] 12)支持自定义filter * [ ] 13)支持TLS diff --git a/biz/filter.go b/biz/filter.go index 891a96e..7eb6493 100644 --- a/biz/filter.go +++ b/biz/filter.go @@ -8,5 +8,10 @@ import ( func NewFilterChain(settings *config.AppSettings) (filter.Filter, error) { c := filter.NewFilterChain() c.AddExcludeFilters(filter.NewMethodExcludeFilter("grpc.reflection")) + + if len(settings.IncludeFilterMethodMatch) > 0 { + f := filter.NewMethodMatchIncludeFilter(settings.IncludeFilterMethodMatch) + c.AddIncludeFilter(f) + } return c, nil } diff --git a/config/settings.go b/config/settings.go index 2659cdb..4d01767 100644 --- a/config/settings.go +++ b/config/settings.go @@ -77,6 +77,8 @@ type AppSettings struct { OutputFileMaxAge int `json:"output-file-max-age"` OutputKafkaConfig plugin.OutputKafkaConfig + // --- filter --- + IncludeFilterMethodMatch string `json:"include-filter-method-match"` // --- other --- Codec string `json:"codec"` } diff --git a/example/search_client/client.go b/example/search_client/client.go index a481914..ace96e3 100644 --- a/example/search_client/client.go +++ b/example/search_client/client.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "log" + "math/rand" "time" pb "github.com/vearne/grpcreplay/example/search_proto" @@ -26,6 +27,20 @@ func main() { } defer conn.Close() + client := pb.NewSearchServiceClient(conn) + counter := 0 + for i := 0; i < 1000000; i++ { + if rand.Intn(1000)%2 == 0 { + counter++ + sendSearch(client, counter) + } else { + sendCurrTime(client) + } + time.Sleep(10 * time.Second) + } +} + +func sendSearch(client pb.SearchServiceClient, i int) { // add some headers md := metadata.New(map[string]string{ "testkey1": "testvalue1", @@ -33,28 +48,41 @@ func main() { }) ctx := metadata.NewOutgoingContext(context.Background(), md) - client := pb.NewSearchServiceClient(conn) - for i := 0; i < 1000000; i++ { - resp, err := client.Search(ctx, - &pb.SearchRequest{ - StaffName: "zhangsan", - Age: uint32(i), - Gender: true, - }, - ) - if err != nil { - statusErr, ok := status.FromError(err) - if ok { - if statusErr.Code() == codes.DeadlineExceeded { - log.Fatalln("client.Search err: deadline") - } + resp, err := client.Search(ctx, + &pb.SearchRequest{ + StaffName: "zhangsan", + Age: uint32(i), + Gender: true, + }, + ) + if err != nil { + statusErr, ok := status.FromError(err) + if ok { + if statusErr.Code() == codes.DeadlineExceeded { + log.Fatalln("client.Search err: deadline") } - - log.Fatalf("client.Search err: %v", err) } - bt, _ := json.Marshal(resp) - log.Println("resp:", string(bt)) - time.Sleep(10 * time.Second) + log.Fatalf("client.Search err: %v", err) + } + + bt, _ := json.Marshal(resp) + log.Println("resp:", string(bt)) +} + +func sendCurrTime(client pb.SearchServiceClient) { + md := metadata.New(map[string]string{ + "testkey3": "testvalue3", + "testkey4": "testvalue4", + }) + ctx := metadata.NewOutgoingContext(context.Background(), md) + resp, err := client.CurrentTime( + ctx, + &pb.TimeRequest{}, + ) + if err != nil { + log.Fatalf("client.CurrentTime err: %v", err) } + bt, _ := json.Marshal(resp) + log.Println("resp:", string(bt)) } diff --git a/filter/include_filter.go b/filter/include_filter.go index 4a28b11..4034f96 100644 --- a/filter/include_filter.go +++ b/filter/include_filter.go @@ -1 +1,29 @@ package filter + +import ( + "github.com/vearne/grpcreplay/protocol" + slog "github.com/vearne/simplelog" + "regexp" +) + +type MethodMatchIncludeFilter struct { + r *regexp.Regexp +} + +func NewMethodMatchIncludeFilter(expr string) *MethodMatchIncludeFilter { + var f MethodMatchIncludeFilter + var err error + f.r, err = regexp.Compile(expr) + if err != nil { + slog.Fatal("expr error:%v", err) + } + return &f +} + +// Filter :If ok is true, it means that the message can pass +func (f *MethodMatchIncludeFilter) Filter(msg *protocol.Message) (*protocol.Message, bool) { + if f.r.MatchString(msg.Data.Method) { + return msg, true + } + return nil, false +} diff --git a/main.go b/main.go index f806a0d..bff96f9 100644 --- a/main.go +++ b/main.go @@ -60,6 +60,9 @@ func init() { based on the timestamp encoded in their filename`) flag.StringVar(&settings.Codec, "codec", "simple", "") + + flag.StringVar(&settings.IncludeFilterMethodMatch, "include-filter-method-match", "", + `filter requests when the method matches the specified regular expression`) } func main() {