Skip to content

Latest commit

 

History

History
181 lines (141 loc) · 3.27 KB

backpressure-01.md

File metadata and controls

181 lines (141 loc) · 3.27 KB

Backpressure

Status

inactive

Context

I can't remember what this example is meant to demonstrate.

package main

import (
	"fmt"
	"math/rand"
	"os"
	"os/signal"
	"sync"
	"sync/atomic"
	"syscall"
	"time"
)

const batchTime = 3

func init() {
	rand.Seed(time.Now().UnixNano())
}

type RunContext struct {
	inChan    chan Event
	batchChan chan EventBatch
	backChan  chan EventBatch
	doneChan  chan bool

	// For reporting purpose
	sumProduced int64
	sumSent     int64
}

// Event simple event
type Event int

// Slice of events
type EventBatch []Event

func NewRunContext() *RunContext {
	return &RunContext{
		inChan:    make(chan Event),
		batchChan: make(chan EventBatch),
		backChan:  make(chan EventBatch, 1),
		doneChan:  make(chan bool),
	}
}

func NewEventBatch() EventBatch {
	return EventBatch{}
}

func (e EventBatch) Merge(other Event) EventBatch {
	return append(e, other)
}

func produce(runCtx *RunContext) {
	defer close(runCtx.inChan)
	for {
		delay := time.Duration(rand.Intn(5)+1) * time.Second
		time.Sleep(delay)

		nMessages := rand.Intn(10) + 1

		for i := 0; i < nMessages; i++ {
			// Generate a random value
			e := Event(rand.Intn(10))

			fmt.Println("Producing:", e)
			select {
			case runCtx.inChan <- e:
				atomic.AddInt64(&runCtx.sumProduced, int64(e))
			case <-runCtx.doneChan:
				fmt.Println("producer completed")
				return
			}
		}
	}
}

func run(runCtx *RunContext, wg *sync.WaitGroup) {
	defer wg.Done()

	eventBatch := NewEventBatch()

	ticker := time.Tick(time.Duration(batchTime) * time.Second)

LOOP:
	for {
		select {
		case ev, ok := <-runCtx.inChan:
			if !ok {
				if len(eventBatch) > 0 {
					fmt.Println("Dispatching last batch")

					runCtx.batchChan <- eventBatch
				}
				close(runCtx.batchChan)
				fmt.Println("run finished")
				break LOOP
			}
			eventBatch = eventBatch.Merge(ev)
		case <-ticker:
			if len(eventBatch) > 0 {
				fmt.Println("waiting to send")
				runCtx.batchChan <- eventBatch
				eventBatch = <-runCtx.backChan
			}
		}

	}
}

func batchWriter(runCtx *RunContext, wg *sync.WaitGroup) {
	defer wg.Done()

	for {
		eb, ok := <-runCtx.batchChan
		if !ok {
			fmt.Println("batch writer completed")
			return
		}

		// Simulate time to persist the batch using a random delay
		delay := time.Duration(rand.Intn(3)+1) * time.Second
		time.Sleep(delay)

		// Perform retry if  this write fails and add an exponential backoff

		for _, e := range eb {
			atomic.AddInt64(&runCtx.sumSent, int64(e))
		}

		fmt.Println("batch sent:", eb, delay)
		runCtx.backChan <- NewEventBatch()
	}
}

func waitOnSignal(runCtx *RunContext, sigs <-chan os.Signal) {
	fmt.Println("awaiting signals")
	sig := <-sigs
	fmt.Println(sig)
	// Shut down input
	close(runCtx.doneChan)
}

func main() {

	var wg sync.WaitGroup

	runCtx := NewRunContext()
	sigs := make(chan os.Signal, 1)

	// if you hit CTRL-C or kill the process this channel will
	// get a signal and trigger a shutdown of the publisher
	// which in turn should trigger a each step of the pipeline
	// to exit
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go waitOnSignal(runCtx, sigs)
	go produce(runCtx)

	wg.Add(2)
	go run(runCtx, &wg)
	go batchWriter(runCtx, &wg)

	wg.Wait()

	fmt.Println("summary")
	fmt.Printf("	produced: %d\n", runCtx.sumProduced)
	fmt.Printf("	sent: %d\n", runCtx.sumSent)
}