diff --git a/closure.go b/parallel/closure.go similarity index 75% rename from closure.go rename to parallel/closure.go index d1e330e..43a9915 100644 --- a/closure.go +++ b/parallel/closure.go @@ -8,5 +8,10 @@ func Compose(f, g func(x float) float) } } -print(Compose(sin, cos)(0.5)) + +func main() { + print(Compose(sin, cos)(0.5)) +} + + diff --git a/parallel/db.go b/parallel/db.go new file mode 100644 index 0000000..6e546b2 --- /dev/null +++ b/parallel/db.go @@ -0,0 +1,13 @@ +package parallel + + +func Query(conns []Conn, query string) Result { + ch := make(chan Result, len(conns)) // buffered + for _, conn := range conns { + go func(c Conn) { + ch <- c.DoQuery(query): + }(conn) + } + return <-ch +} + diff --git a/parallel/loadBalancer.go b/parallel/loadBalancer.go new file mode 100644 index 0000000..429da6d --- /dev/null +++ b/parallel/loadBalancer.go @@ -0,0 +1,16 @@ + +package main + + + +func Run() { + in, out := make(chan *Work), make(chan *Work) + + for i := 0; i < NumWorkers; i++ { + go worker(in, out) + } + + go sendLotsOfWork(in) + + receiveLotsOfResults(out) +} diff --git a/parallel/req.go b/parallel/req.go new file mode 100644 index 0000000..f00fd9c --- /dev/null +++ b/parallel/req.go @@ -0,0 +1,89 @@ +package main + + +type Request struct { + fn func() int // The operation to perform. + c chan int // The channel to return the result. +} + +// A channel of requests, plus some load tracking data. +type Worker struct { + requests chan Request // work to do (buffered channel) + pending int // count of pending tasks + index int // index in the heap +} + +type Pool []*Worker + +type Balancer struct { + pool Pool + done chan *Worker +} + + +func requester(work chan<- Request) { + c := make(chan int) + for { + // Kill some time (fake load). + Sleep(rand.Int63n(nWorker * 2 * Second)) + work <- Request{workFn, c} // send request + result := <-c // wait for answer + furtherProcess(result) + } +} + + +// Balancer sends request to most lightly loaded worker +func (w *Worker) work(done chan *Worker) { + for { + req := <-w.requests // get Request from balancer + req.c <- req.fn() // call fn and send result + done <- w // we've finished this request + } +} + +// Balancer function + +func (b *Balancer) balance(work chan Request) { + for { + select { + case req := <-work: // received a Request... + b.dispatch(req) // ...so send it to a Worker + case w := <-b.done: // a worker has finished ... + b.completed(w) // ...so update its info + } + } +} + +// Make Pool an implementation of the Heap interface by providing a few methods such as: +func (p Pool) Less(i, j int) bool { + return p[i].pending < p[j].pending +} + +// Dispatch +// All the pieces are in place. + +// Send Request to worker +func (b *Balancer) dispatch(req Request) { + // Grab the least loaded worker... + w := heap.Pop(&b.pool).(*Worker) + // ...send it the task. + w.requests <- req + // One more in its work queue. + w.pending++ + // Put it into its place on the heap. + heap.Push(&b.pool, w) +} + +// Completed + +// Job is complete; update heap +func (b *Balancer) completed(w *Worker) { + // One fewer in the queue. + w.pending-- + // Remove it from heap. + heap.Remove(&b.pool, w.index) + // Put it into its place on the heap. + heap.Push(&b.pool, w) +} +