Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove quiescent period #740

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions cmd/serf/command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,11 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer) *Agent {
serfConfig.Tags = config.Tags
serfConfig.SnapshotPath = config.SnapshotPath
serfConfig.ProtocolVersion = uint8(config.Protocol)
serfConfig.CoalescePeriod = 3 * time.Second
serfConfig.QuiescentPeriod = time.Second
serfConfig.CoalescePeriod = time.Second
serfConfig.QueryResponseSizeLimit = config.QueryResponseSizeLimit
serfConfig.QuerySizeLimit = config.QuerySizeLimit
serfConfig.UserEventSizeLimit = config.UserEventSizeLimit
serfConfig.UserCoalescePeriod = 3 * time.Second
serfConfig.UserQuiescentPeriod = time.Second
serfConfig.UserCoalescePeriod = time.Second
if config.ReconnectInterval != 0 {
serfConfig.ReconnectInterval = config.ReconnectInterval
}
Expand Down
41 changes: 7 additions & 34 deletions serf/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,16 @@ type coalescer interface {
// coalescedEventCh returns an event channel where the events are coalesced
// using the given coalescer.
func coalescedEventCh(outCh chan<- Event, shutdownCh <-chan struct{},
cPeriod time.Duration, qPeriod time.Duration, c coalescer) chan<- Event {
cPeriod time.Duration, c coalescer) chan<- Event {
inCh := make(chan Event, 1024)
go coalesceLoop(inCh, outCh, shutdownCh, cPeriod, qPeriod, c)
go coalesceLoop(inCh, outCh, shutdownCh, cPeriod, c)
return inCh
}

// coalesceLoop is a simple long-running routine that manages the high-level
// flow of coalescing based on quiescence and a maximum quantum period.
func coalesceLoop(inCh <-chan Event, outCh chan<- Event, shutdownCh <-chan struct{},
coalescePeriod time.Duration, quiescentPeriod time.Duration, c coalescer) {
var quiescent <-chan time.Time
var quantum <-chan time.Time
shutdown := false

INGEST:
// Reset the timers
quantum = nil
quiescent = nil

coalescePeriod time.Duration, c coalescer) {
for {
select {
case e := <-inCh:
Expand All @@ -52,32 +43,14 @@ INGEST:
continue
}

// Start a new quantum if we need to
// and restart the quiescent timer
if quantum == nil {
quantum = time.After(coalescePeriod)
}
quiescent = time.After(quiescentPeriod)

// Coalesce the event
c.Coalesce(e)

case <-quantum:
goto FLUSH
case <-quiescent:
goto FLUSH
case <-time.After(coalescePeriod):
c.Flush(outCh)
case <-shutdownCh:
shutdown = true
goto FLUSH
c.Flush(outCh)
return
}
}

FLUSH:
// Flush the coalesced events
c.Flush(outCh)

// Restart ingestion if we are not done
if !shutdown {
goto INGEST
}
}
4 changes: 2 additions & 2 deletions serf/coalesce_member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestMemberEventCoalesce_Basic(t *testing.T) {
}

inCh := coalescedEventCh(outCh, shutdownCh,
5*time.Millisecond, 5*time.Millisecond, c)
5*time.Millisecond, c)

send := []Event{
MemberEvent{
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestMemberEventCoalesce_TagUpdate(t *testing.T) {
}

inCh := coalescedEventCh(outCh, shutdownCh,
5*time.Millisecond, 5*time.Millisecond, c)
5*time.Millisecond, c)

inCh <- MemberEvent{
Type: EventMemberUpdate,
Expand Down
38 changes: 4 additions & 34 deletions serf/coalesce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ func (c *mockCoalesce) Flush(outChan chan<- Event) {
c.value = 0
}

func testCoalescer(cPeriod, qPeriod time.Duration) (chan<- Event, <-chan Event, chan<- struct{}) {
func testCoalescer(cPeriod time.Duration) (chan<- Event, <-chan Event, chan<- struct{}) {
in := make(chan Event, 64)
out := make(chan Event)
shutdown := make(chan struct{})
c := &mockCoalesce{}
go coalesceLoop(in, out, shutdown, cPeriod, qPeriod, c)
go coalesceLoop(in, out, shutdown, cPeriod, c)
return in, out, shutdown
}

func TestCoalescer_basic(t *testing.T) {
in, out, shutdown := testCoalescer(5*time.Millisecond, time.Second)
in, out, shutdown := testCoalescer(5 * time.Millisecond)
defer close(shutdown)

send := []Event{
Expand All @@ -79,38 +79,8 @@ func TestCoalescer_basic(t *testing.T) {
}
}

func TestCoalescer_quiescent(t *testing.T) {
// This tests the quiescence by creating a long coalescence period
// with a short quiescent period and waiting only a multiple of the
// quiescent period for results.
in, out, shutdown := testCoalescer(10*time.Second, 10*time.Millisecond)
defer close(shutdown)

send := []Event{
counterEvent{1},
counterEvent{39},
counterEvent{2},
}
for _, e := range send {
in <- e
}

select {
case e := <-out:
if e.EventType() != EventCounter {
t.Fatalf("expected counter, got: %d", e.EventType())
}

if e.(counterEvent).delta != 42 {
t.Fatalf("bad: %#v", e)
}
case <-time.After(50 * time.Millisecond):
t.Fatalf("timeout")
}
}

func TestCoalescer_passThrough(t *testing.T) {
in, out, shutdown := testCoalescer(time.Second, time.Second)
in, out, shutdown := testCoalescer(time.Second)
defer close(shutdown)

send := []Event{
Expand Down
2 changes: 1 addition & 1 deletion serf/coalesce_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestUserEventCoalesce_Basic(t *testing.T) {
}

inCh := coalescedEventCh(outCh, shutdownCh,
5*time.Millisecond, 5*time.Millisecond, c)
5*time.Millisecond, c)

send := []Event{
UserEvent{
Expand Down
6 changes: 2 additions & 4 deletions serf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,12 @@ type Config struct {
// seconds, then the events will be coalesced and dispatched if no
// new events are received within 2 seconds of the last event. Otherwise,
// every event will always be delayed by at least 10 seconds.
CoalescePeriod time.Duration
QuiescentPeriod time.Duration
CoalescePeriod time.Duration

// The settings below relate to Serf's user event coalescing feature.
// The settings operate like above but only affect user messages and
// not the Member* messages that Serf generates.
UserCoalescePeriod time.Duration
UserQuiescentPeriod time.Duration
UserCoalescePeriod time.Duration

// The settings below relate to Serf keeping track of recently
// failed/left nodes and attempting reconnects.
Expand Down
8 changes: 4 additions & 4 deletions serf/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,24 +291,24 @@ func Create(conf *Config) (*Serf, error) {
}

// Check if serf member event coalescing is enabled
if conf.CoalescePeriod > 0 && conf.QuiescentPeriod > 0 && conf.EventCh != nil {
if conf.CoalescePeriod > 0 && conf.EventCh != nil {
c := &memberEventCoalescer{
lastEvents: make(map[string]EventType),
latestEvents: make(map[string]coalesceEvent),
}

conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
conf.CoalescePeriod, conf.QuiescentPeriod, c)
conf.CoalescePeriod, c)
}

// Check if user event coalescing is enabled
if conf.UserCoalescePeriod > 0 && conf.UserQuiescentPeriod > 0 && conf.EventCh != nil {
if conf.UserCoalescePeriod > 0 && conf.EventCh != nil {
c := &userEventCoalescer{
events: make(map[string]*latestUserEvents),
}

conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
conf.UserCoalescePeriod, conf.UserQuiescentPeriod, c)
conf.UserCoalescePeriod, c)
}

// Listen for internal Serf queries. This is setup before the snapshotter, since
Expand Down