Skip to content

Commit

Permalink
test - migrate to dapr clientconn and removal of app-id
Browse files Browse the repository at this point in the history
Signed-off-by: mikeee <[email protected]>
  • Loading branch information
mikeee committed Jan 8, 2024
1 parent 2fe6d7a commit b1ab5f9
Show file tree
Hide file tree
Showing 7 changed files with 526 additions and 11 deletions.
2 changes: 2 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ expected_stdout_lines:
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == workflow paused'
- '== APP == workflow resumed'
- '== APP == stage: 1'
- '== APP == workflow event raised'
- '== APP == stage: 2'
- '== APP == workflow status: COMPLETED'
Expand Down Expand Up @@ -55,6 +56,7 @@ dapr run --app-id workflow-sequential \
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == workflow paused'
- '== APP == workflow resumed'
- '== APP == stage: 1'
- '== APP == workflow event raised'
- '== APP == stage: 2'
- '== APP == workflow status: COMPLETED'
Expand Down
8 changes: 4 additions & 4 deletions examples/workflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"google.golang.org/grpc/metadata"
"log"
"sync"
"time"
Expand All @@ -19,7 +18,7 @@ const (
)

func main() {
wr, err := workflow.NewRuntime("localhost", "50001")
wr, err := workflow.NewRuntime(":50001")
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -54,7 +53,6 @@ func main() {
log.Fatalf("failed to intialise client: %v", err)
}
ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, "dapr-app-id", "workflow-sequential")

// Start workflow test
respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
Expand Down Expand Up @@ -116,7 +114,9 @@ func main() {
log.Fatalf("workflow not running")
}

fmt.Printf("workflow resumed\n")
fmt.Println("workflow resumed")

fmt.Printf("stage: %d", stage)

// Raise Event Test

Expand Down
175 changes: 175 additions & 0 deletions workflow/internal/parse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
"errors"
"fmt"
"net"
"net/url"
"strings"
)

// Parsed represents a parsed gRPC endpoint.
type Parsed struct {
Target string
TLS bool
}

//nolint:revive
func ParseGRPCEndpoint(endpoint string) (Parsed, error) {
target := endpoint
if len(target) == 0 {
return Parsed{}, errors.New("target is required")
}

var dnsAuthority string
var hostname string
var tls bool

urlSplit := strings.Split(target, ":")
if len(urlSplit) == 3 && !strings.Contains(target, "://") {
target = strings.Replace(target, ":", "://", 1)
} else if len(urlSplit) >= 2 && !strings.Contains(target, "://") && schemeKnown(urlSplit[0]) {
target = strings.Replace(target, ":", "://", 1)
} else {
urlSplit = strings.Split(target, "://")
if len(urlSplit) == 1 {
target = "dns://" + target
} else {
scheme := urlSplit[0]
if !schemeKnown(scheme) {
return Parsed{}, fmt.Errorf("unknown scheme: %q", scheme)
}

if scheme == "dns" {
urlSplit = strings.Split(target, "/")
if len(urlSplit) < 4 {
return Parsed{}, fmt.Errorf("invalid dns scheme: %q", target)
}
dnsAuthority = urlSplit[2]
target = "dns://" + urlSplit[3]
}
}
}

ptarget, err := url.Parse(target)
if err != nil {
return Parsed{}, err
}

var errs []string
for k := range ptarget.Query() {
if k != "tls" {
errs = append(errs, fmt.Sprintf("unrecognized query parameter: %q", k))
}
}
if len(errs) > 0 {
return Parsed{}, fmt.Errorf("failed to parse target %q: %s", target, strings.Join(errs, "; "))
}

if ptarget.Query().Has("tls") {
if ptarget.Scheme == "http" || ptarget.Scheme == "https" {
return Parsed{}, errors.New("cannot use tls query parameter with http(s) scheme")
}

qtls := ptarget.Query().Get("tls")
if qtls != "true" && qtls != "false" {
return Parsed{}, fmt.Errorf("invalid value for tls query parameter: %q", qtls)
}

tls = qtls == "true"
}

scheme := ptarget.Scheme
if scheme == "https" {
tls = true
}
if scheme == "http" || scheme == "https" {
scheme = "dns"
}

hostname = ptarget.Host

host, port, err := net.SplitHostPort(hostname)
aerr, ok := err.(*net.AddrError)
if ok && aerr.Err == "missing port in address" {
port = "443"
} else if err != nil {
return Parsed{}, err
} else {
hostname = host
}

if len(hostname) == 0 {
if scheme == "dns" {
hostname = "localhost"
} else {
hostname = ptarget.Path
}
}

switch scheme {
case "unix":
separator := ":"
if strings.HasPrefix(endpoint, "unix://") {
separator = "://"
}
target = scheme + separator + hostname

case "vsock":
target = scheme + ":" + hostname + ":" + port

case "unix-abstract":
target = scheme + ":" + hostname

case "dns":
if len(ptarget.Path) > 0 {
return Parsed{}, fmt.Errorf("path is not allowed: %q", ptarget.Path)
}

if strings.Count(hostname, ":") == 7 && !strings.HasPrefix(hostname, "[") && !strings.HasSuffix(hostname, "]") {
hostname = "[" + hostname + "]"
}
if len(dnsAuthority) > 0 {
dnsAuthority = "//" + dnsAuthority + "/"
}
target = scheme + ":" + dnsAuthority + hostname + ":" + port

default:
return Parsed{}, fmt.Errorf("unsupported scheme: %q", scheme)
}

return Parsed{
Target: target,
TLS: tls,
}, nil
}

func schemeKnown(scheme string) bool {
for _, s := range []string{
"dns",
"unix",
"unix-abstract",
"vsock",
"http",
"https",
} {
if scheme == s {
return true
}
}

return false
}
Loading

0 comments on commit b1ab5f9

Please sign in to comment.