Skip to content

Commit

Permalink
all: initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
changkun committed Jan 22, 2021
1 parent 4c755ad commit 335e9ad
Show file tree
Hide file tree
Showing 7 changed files with 464 additions and 2 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/thread.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: thread

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:
name: Build
runs-on: ubuntu-latest
steps:

- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.13
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Get dependencies
run: |
go get -v -t -d ./...
- name: Race Test
run: |
go test -v -race -count=1 ./...
- name: Test
run: |
go test -v -coverprofile=coverage.txt -covermode=atomic ./...
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,22 @@
# thread
thead facilities in Go
# thread [![PkgGoDev](https://pkg.go.dev/badge/golang.design/x/thread)](https://pkg.go.dev/golang.design/x/thread) [![Go Report Card](https://goreportcard.com/badge/golang.design/x/thread)](https://goreportcard.com/report/golang.design/x/thread) ![thread](https://github.com/golang-design/thread/workflows/thread/badge.svg?branch=main)

Package thread provides threading facilities, such as scheduling
calls on a specific thread, local storage, etc.

```go
import "golang.design/x/thread"
```

## Quick Start

```go
th := thread.New()

th.Call(func() {
// call on the created thread
})
```

## License

MIT © 2021 The golang.design Initiative
43 changes: 43 additions & 0 deletions bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package thread_test

import (
"testing"

"golang.design/x/thread"
)

func BenchmarkThread_Call(b *testing.B) {
th := thread.New()
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
th.Call(func() {})
}
}
func BenchmarkThread_CallV(b *testing.B) {
th := thread.New()
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_ = th.CallV(func() interface{} {
return true
}).(bool)
}
}

func BenchmarkThread_TLS(b *testing.B) {
th := thread.New()
th.Call(func() {
th.SetTLS(1)
})

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = th.CallV(func() interface{} {
return th.GetTLS()
})
}
}
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module golang.design/x/thread

go 1.13

require golang.org/x/sys v0.0.0-20210122093101-04d7465088b8
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
golang.org/x/sys v0.0.0-20210122093101-04d7465088b8 h1:de2yTH1xuxjmGB7i6Z5o2z3RCHVa0XlpSZzjd8Fe6bE=
golang.org/x/sys v0.0.0-20210122093101-04d7465088b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
220 changes: 220 additions & 0 deletions thread.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright 2020 The golang.design Initiative Authors.
// All rights reserved. Use of this source code is governed
// by a MIT license that can be found in the LICENSE file.

// Package thread provides threading facilities, such as scheduling
// calls on a specific thread, local storage, etc.
package thread // import "golang.design/x/thread"

import (
"runtime"
"sync"
"sync/atomic"
)

// Thread represents a thread instance.
type Thread interface {
// ID returns the ID of the thread.
ID() uint64

// Call calls fn from the given thread. It blocks until fn returns.
Call(fn func())

// CallNonBlock call fn from the given thread without waiting
// fn to complete.
CallNonBlock(fn func())

// CallV call fn from the given thread and returns the returned
// value from fn.
//
// The purpose of this function is to avoid value escaping.
// In particular:
//
// th := thread.New()
// var ret interface{}
// th.Call(func() {
// ret = 1
// })
//
// will cause variable ret be allocated on the heap, whereas
//
// th := thread.New()
// ret := th.CallV(func() interface{} {
// return 1
// }).(int)
//
// will offer zero allocation benefits.
CallV(fn func() interface{}) interface{}

// SetTLS stores a given value to the local storage of the given
// thread. This method must be accessed in Call, or CallV, or
// CallNonBlock. For instance:
//
// th := thread.New()
// th.Call(func() {
// th.SetTLS("store in thread local storage")
// })
SetTLS(x interface{})

// GetTLS returns the locally stored value from local storage of
// the given thread. This method must be access in Call, or CallV,
// or CallNonBlock. For instance:
//
// th := thread.New()
// th.Call(func() {
// tls := th.GetTLS()
// // ... do what ever you want to do with tls value ...
// })
//
GetTLS() interface{}

// Terminate terminates the given thread gracefully.
// Scheduled but unexecuted calls will be discarded.
Terminate()
}

// New creates a new thread instance.
func New() Thread {
th := thread{
id: atomic.AddUint64(&globalID, 1),
fdCh: make(chan funcData, runtime.GOMAXPROCS(0)),
doneCh: make(chan struct{}),
}
runtime.SetFinalizer(&th, func(th interface{}) {
th.(*thread).Terminate()
})
go func() {
runtime.LockOSThread()
for {
select {
case fd := <-th.fdCh:
func() {
if fd.fn != nil {
defer func() {
if fd.done != nil {
fd.done <- struct{}{}
}
}()
fd.fn()
} else if fd.fnv != nil {
var ret interface{}
defer func() {
if fd.ret != nil {
fd.ret <- ret
}
}()
ret = fd.fnv()
}
}()
case <-th.doneCh:
close(th.doneCh)
return
}
}
}()
return &th
}

var (
donePool = sync.Pool{
New: func() interface{} {
return make(chan struct{})
},
}
varPool = sync.Pool{
New: func() interface{} {
return make(chan interface{})
},
}
globalID uint64 // atomic
_ Thread = &thread{}
)

type funcData struct {
fn func()
done chan struct{}

fnv func() interface{}
ret chan interface{}
}

type thread struct {
id uint64
tls interface{}

fdCh chan funcData
doneCh chan struct{}
}

func (th thread) ID() uint64 {
return th.id
}

func (th *thread) Call(fn func()) {
if fn == nil {
return
}

select {
case <-th.doneCh:
return
default:
done := donePool.Get().(chan struct{})
defer donePool.Put(done)
defer func() { <-done }()

th.fdCh <- funcData{fn: fn, done: done}
}
return
}

func (th *thread) CallNonBlock(fn func()) {
if fn == nil {
return
}
select {
case <-th.doneCh:
return
default:
th.fdCh <- funcData{fn: fn}
}
}

func (th *thread) CallV(fn func() interface{}) (ret interface{}) {
if fn == nil {
return nil
}

select {
case <-th.doneCh:
return nil
default:
done := varPool.Get().(chan interface{})
defer varPool.Put(done)
defer func() { ret = <-done }()

th.fdCh <- funcData{fnv: fn, ret: done}
return
}
}

func (th *thread) GetTLS() interface{} {
return th.tls
}

func (th *thread) SetTLS(x interface{}) {
th.tls = x
}

func (th *thread) Terminate() {
select {
case <-th.doneCh:
return
default:
th.doneCh <- struct{}{}
select {
case <-th.doneCh:
return
}
}
}
Loading

0 comments on commit 335e9ad

Please sign in to comment.