Monday, August 14, 2017

Simple Thread Pool in Golang


Simple Thread Pool in golang with Simple Future response.

package main

import "fmt"
import "time"
import "sync"

// Call is a callable which returns a value
type Call func() interface{}

// Run is a runnable function
type Run func()

// Future is a holder for cal result.
type Future interface {
Done() bool
Result() interface{}
ResultAwait(time.Duration) interface{}
}

// future is an implementation of Future interface
type future struct {
done    bool
result  interface{}
reschan chan struct{}
sync.RWMutex
}

func (f *future) Done() bool {
f.RLock()
defer f.RUnlock()
return f.done
}

func (f *future) Result() interface{} {
f.RLock()
defer f.RUnlock()
return f.result
}

func (f *future) ResultAwait(d time.Duration) interface{} {
select {
case <-f.reschan:
case <-time.After(d):
}
return f.result
}

//--------------------------------------------------------

// worker is worker used in pool
type worker struct {
name string
}

// start method of worker, called when pool is initialized
func (w *worker) start(runq <-chan Run, quit <-chan struct{}) {
for {
select {
case f := <-runq:
f()
case <-quit:
return
}
}
}

//-------------------------------------------------------------

// Pool is pool
type Pool struct {
pool
}

// pool is a local pool used to implemnt pool methods
type pool struct {
size int
runq chan Run
quit chan struct{}
}

// NewPool is pool constructor
func NewPool(size int) *Pool {
var p = Pool{
pool: pool{
size: size,
runq: make(chan Run, size),
quit: make(chan struct{}, size),
},
}
p.init()
return &p
}

// init method to initialize the pool.
func (p *pool) init() {
for i := 0; i < p.size; i++ {
var w = worker{name: fmt.Sprintf("worker %d", i)}
go w.start(p.runq, p.quit)
}
}

// Stop the pool after all the submitted tasks are done.
func (p *pool) Stop() {
p.ExecuteRun(func() {
close(p.quit)
})
}

// ExecuteRun runs a runnable object.
func (p *pool) ExecuteRun(run Run) {
p.runq <- run
}

// ExecuteCall runs a callable function.
func (p *pool) ExecuteCall(call Call) Future {
var fut = future{reschan: make(chan struct{})}
fn := func() {
res := call()
fut.Lock()
defer fut.Unlock()
fut.done = true
fut.result = res
}
p.ExecuteRun(fn)
return &fut
}

//-----------------------------------------------------------------
func main() {
p := NewPool(10)
for i := 0; i < 5; i++ {
j := i + 1
p.ExecuteRun(func() {
fmt.Println("hello ", j)
})
}

time.Sleep(10 * time.Millisecond)

fut := p.ExecuteCall(hello)
res := fut.ResultAwait(30 * time.Millisecond)
fmt.Println("Result is", res)

fut1 := p.ExecuteCall(wrap(welcome))
res1 := fut1.ResultAwait(45 * time.Millisecond)
fmt.Println("Result for fut1 is:----->>>", res1)

time.Sleep(1 * time.Second)
p.Stop()
fmt.Println("Result after stop for fut is: -->>>>", fut.Result())
fmt.Println("Done............")

}

func hello() interface{} {
time.Sleep(45 * time.Millisecond)
return "Hello World."
}

func welcome() string {
time.Sleep(30 * time.Millisecond)
return "Welcome to go world."
}

// wrap is a wrapper function,
// as func() interface{} signature type does not match func() string signature type,
// Not sure why :)

func wrap(f func() string) func() interface{} {
return func() interface{} { return f() }
}

No comments:

Post a Comment