Skip to content

Commit

Permalink
Make it possible to listen on unix sockets (#21)
Browse files Browse the repository at this point in the history
* Make it possible to listen to unix sockets

+ rename listeners protocol to forwardProtocol
+ add listenProtocol option

* Move protocol finder to a separate function
  • Loading branch information
jkmw authored Jul 20, 2020
1 parent b46e51b commit d797359
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 20 deletions.
9 changes: 6 additions & 3 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ type Watch struct {
}

type Listener struct {
Address string `hcl:",key"`
Forward string `hcl:"forward"`
Protocol string `hcl:"protocol"`
Address string `hcl:",key"`
ListenProtocol string `hcl:"listenProtocol"`
Forward string `hcl:"forward"`
ForwardProtocol string `hcl:"forwardProtocol"`

Protocol string `hcl:"protocol"` // deprecated
}

type Laziness struct {
Expand Down
47 changes: 30 additions & 17 deletions pkg/proc/job_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,45 @@ import (
"context"
"errors"
"fmt"
"github.com/mittwald/mittnite/internal/config"
log "github.com/sirupsen/logrus"
"io"
"net"
"sync/atomic"
"time"

"github.com/mittwald/mittnite/internal/config"
log "github.com/sirupsen/logrus"
)

type Listener struct {
config *config.Listener
job *Job
socket net.Listener
spinUpTimeout time.Duration
config *config.Listener
job *Job
socket net.Listener
spinUpTimeout time.Duration
}

func NewListener(j *Job, c *config.Listener) (*Listener, error) {
log.WithField("address", c.Address).Info("starting TCP listener")

listener, err := net.Listen("tcp", c.Address)
// deprecation check
if c.Protocol != "" {
if c.ForwardProtocol == "" {
log.Warnf("field protocol in job %s is deprecated in favor of forwardProtocol", j.Config.Name)
c.ForwardProtocol = c.Protocol
} else {
log.Warnf("field protocol in job %s is ignored because it is deprecated and forwardProtocol is already set", j.Config.Name)
}
}

listener, err := net.Listen(getProto(c.ListenProtocol), c.Address)
if err != nil {
return nil, err
}

return &Listener{
config: c,
job: j,
socket: listener,
spinUpTimeout: j.spinUpTimeout,
config: c,
job: j,
socket: listener,
spinUpTimeout: j.spinUpTimeout,
}, nil
}

Expand All @@ -47,11 +58,6 @@ func (l *Listener) Run(ctx context.Context) error {
}

func (l *Listener) provideUpstreamConnection() (net.Conn, error) {
prot := l.config.Protocol
if prot == "" {
prot = "tcp"
}

timeout := time.NewTimer(l.spinUpTimeout)
ticker := time.NewTicker(20 * time.Millisecond)

Expand All @@ -61,7 +67,7 @@ func (l *Listener) provideUpstreamConnection() (net.Conn, error) {
for {
select {
case <-ticker.C:
conn, err := net.Dial(prot, l.config.Forward)
conn, err := net.Dial(getProto(l.config.ForwardProtocol), l.config.Forward)
if err == nil {
return conn, nil
}
Expand Down Expand Up @@ -137,3 +143,10 @@ func (l *Listener) run(ctx context.Context) <-chan error {

return errChan
}

func getProto(proto string) string {
if proto == "" {
proto = "tcp"
}
return proto
}

0 comments on commit d797359

Please sign in to comment.