Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions #215

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- run:
name: Test
command: |
go test -coverprofile=coverage.txt -coverpkg=./... -covermode=atomic -v ./...
go test -race -coverprofile=coverage.txt -coverpkg=./... -covermode=atomic -v ./...
bash <(curl -s https://codecov.io/bash)
build:
docker:
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,11 @@ func (o *Operator) Stop() {
o.stopped = true

for _, srv := range o.services {
srv.inPort.Close()
srv.outPort.Close()
}
for _, dlg := range o.delegates {
dlg.inPort.Close()
dlg.outPort.Close()
}

Expand Down
43 changes: 31 additions & 12 deletions pkg/core/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ func (p *Port) Map(name string) *Port {
return port
}

func (p *Port) Lock() {
p.mutex.Lock()
}

func (p *Port) Unlock() {
p.mutex.Unlock()
}

// Returns the length of the map ports
func (p *Port) MapSize() int {
return len(p.subs)
Expand Down Expand Up @@ -300,12 +308,12 @@ func (p *Port) Close() {
if p.closed {
return
}

p.Lock()
p.closed = true

if p.buf != nil {
close(p.buf)
}
p.Unlock()

if p.sub != nil {
p.sub.Close()
Expand Down Expand Up @@ -405,7 +413,7 @@ func (p *Port) assertChannelSpace() {
c := cap(p.buf)
if len(p.buf) > c/2 {
newChan := make(chan interface{}, 2*c)
p.mutex.Lock()
p.Lock()
for {
select {
case i := <-p.buf:
Expand All @@ -416,7 +424,7 @@ func (p *Port) assertChannelSpace() {
}
end:
p.buf = newChan
p.mutex.Unlock()
p.Unlock()
}
}

Expand All @@ -439,17 +447,20 @@ func (p *Port) Closed() bool {

// Push an item to this port.
func (p *Port) Push(item interface{}) {
p.Lock()
if p.closed {
p.Unlock()
return
}
p.Unlock()

if p.buf != nil {
if CHANNEL_DYNAMIC {
p.assertChannelSpace()

p.mutex.Lock()
p.Lock()
p.buf <- item
p.mutex.Unlock()
p.Unlock()
} else {
p.buf <- item
}
Expand Down Expand Up @@ -526,18 +537,26 @@ func (p *Port) Pull() interface{} {
if p.buf != nil {
if CHANNEL_DYNAMIC {
for {
p.mutex.Lock()
p.Lock()
select {
case i := <-p.buf:
p.mutex.Unlock()
p.Unlock()
return i
default:
p.mutex.Unlock()
p.Unlock()
}
time.Sleep(1 * time.Millisecond)
}
} else {
return <-p.buf
i, ok := <-p.buf
if !ok {
// the channel was activly closed
// but we still send the zero value
// we recievied - this solves a strange
// race condition detected with `go test -race ...`
return i
}
return i
}
}

Expand Down Expand Up @@ -677,9 +696,9 @@ func (p *Port) Poll() interface{} {

var i interface{}
if CHANNEL_DYNAMIC {
p.mutex.Lock()
p.Lock()
i = <-p.buf
p.mutex.Unlock()
p.Unlock()
} else {
i = <-p.buf
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/elem/control_reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ var controlReduceCfg = &builtinConfig{

i = in.Stream().Pull()
if in.OwnEOS(i) {
mutex.Lock()
done = true
mutex.Unlock()
break
}

Expand Down
13 changes: 11 additions & 2 deletions pkg/elem/meta_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
type storePipe struct {
index int
items []interface{}
port *core.Port
}

type store map[*core.Port]*storePipe
Expand All @@ -17,11 +18,15 @@ func (s store) attachPort(p *core.Port) {
if p.Primitive() {
s[p] = &storePipe{
index: 0,
port: p,
items: []interface{}{},
}
go func() {
for !p.Operator().Stopped() {
s[p].items = append(s[p].items, p.Pull())
i := p.Pull()
p.Lock()
s[p].items = append(s[p].items, i)
p.Unlock()
}
}()
} else if p.Type() == core.TYPE_MAP {
Expand All @@ -34,12 +39,16 @@ func (s store) attachPort(p *core.Port) {
}

func (p *storePipe) next() interface{} {
p.port.Lock()
if p.index >= len(p.items) {
p.port.Unlock()
return core.PHMultiple
}
index := p.index
p.index++
return p.items[index]
r := p.items[index]
p.port.Unlock()
return r
}

func (s store) pull(p *core.Port) interface{} {
Expand Down