Skip to content

Commit

Permalink
parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
ahndmal committed Jan 28, 2024
1 parent c781da8 commit a0e77df
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 1 deletion.
7 changes: 6 additions & 1 deletion closure.go → parallel/closure.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@ func Compose(f, g func(x float) float)
}
}

print(Compose(sin, cos)(0.5))

func main() {
print(Compose(sin, cos)(0.5))
}



13 changes: 13 additions & 0 deletions parallel/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package parallel


func Query(conns []Conn, query string) Result {
ch := make(chan Result, len(conns)) // buffered
for _, conn := range conns {
go func(c Conn) {
ch <- c.DoQuery(query):
}(conn)
}
return <-ch
}

16 changes: 16 additions & 0 deletions parallel/loadBalancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

package main



func Run() {
in, out := make(chan *Work), make(chan *Work)

for i := 0; i < NumWorkers; i++ {
go worker(in, out)
}

go sendLotsOfWork(in)

receiveLotsOfResults(out)
}
89 changes: 89 additions & 0 deletions parallel/req.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main


type Request struct {
fn func() int // The operation to perform.
c chan int // The channel to return the result.
}

// A channel of requests, plus some load tracking data.
type Worker struct {
requests chan Request // work to do (buffered channel)
pending int // count of pending tasks
index int // index in the heap
}

type Pool []*Worker

type Balancer struct {
pool Pool
done chan *Worker
}


func requester(work chan<- Request) {
c := make(chan int)
for {
// Kill some time (fake load).
Sleep(rand.Int63n(nWorker * 2 * Second))
work <- Request{workFn, c} // send request
result := <-c // wait for answer
furtherProcess(result)
}
}


// Balancer sends request to most lightly loaded worker
func (w *Worker) work(done chan *Worker) {
for {
req := <-w.requests // get Request from balancer
req.c <- req.fn() // call fn and send result
done <- w // we've finished this request
}
}

// Balancer function

func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work: // received a Request...
b.dispatch(req) // ...so send it to a Worker
case w := <-b.done: // a worker has finished ...
b.completed(w) // ...so update its info
}
}
}

// Make Pool an implementation of the Heap interface by providing a few methods such as:
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}

// Dispatch
// All the pieces are in place.

// Send Request to worker
func (b *Balancer) dispatch(req Request) {
// Grab the least loaded worker...
w := heap.Pop(&b.pool).(*Worker)
// ...send it the task.
w.requests <- req
// One more in its work queue.
w.pending++
// Put it into its place on the heap.
heap.Push(&b.pool, w)
}

// Completed

// Job is complete; update heap
func (b *Balancer) completed(w *Worker) {
// One fewer in the queue.
w.pending--
// Remove it from heap.
heap.Remove(&b.pool, w.index)
// Put it into its place on the heap.
heap.Push(&b.pool, w)
}

0 comments on commit a0e77df

Please sign in to comment.