Skip to content

Commit

Permalink
Rename/reword to semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
aertje committed Dec 10, 2024
1 parent e9341ad commit 8134fd5
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 60 deletions.
60 changes: 30 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Nice
# Semaphore

[![Go Reference](https://pkg.go.dev/badge/github.com/aertje/gonice.svg)](https://pkg.go.dev/github.com/aertje/gonice)
[![Go Report Card](https://goreportcard.com/badge/github.com/aertje/gonice)](https://goreportcard.com/report/github.com/aertje/gonice)
[![Go Reference](https://pkg.go.dev/badge/github.com/aertje/semaphore.svg)](https://pkg.go.dev/github.com/aertje/semaphore)
[![Go Report Card](https://goreportcard.com/badge/github.com/aertje/semaphore)](https://goreportcard.com/report/github.com/aertje/semaphore)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

The `nice` package provides a priority-based concurrency control mechanism. It allows you to manage the execution of functions based on their priority while respecting a maximum concurrency limit. This is particularly useful in scenarios where certain tasks need to be prioritised over others, and there is a need to limit the number of concurrent tasks to avoid overloading the system.
The `semaphore` package provides a priority-based concurrency control mechanism. It allows you to manage the execution of functions based on their priority while respecting a maximum concurrency limit. This is particularly useful in scenarios where certain tasks need to be prioritised over others, and there is a need to limit the number of concurrent tasks to avoid overloading the system.

The general use case is to prioritise certain CPU-bound tasks over others. For example, in a web service, it could be used for example to prioritise the alive endpoint over the metrics endpoint, or to serve bulk requests before real-time requests.

The implementation does not interfere with Go's runtime scheduler. It is opt-in and does not affect the behavior of other goroutines in the application.
The implementation does not interfere with Go's runtime semaphore. It is opt-in and does not affect the behavior of other goroutines in the application.

## Features

Expand All @@ -21,12 +21,12 @@ The implementation does not interfere with Go's runtime scheduler. It is opt-in
To install the package, use the following command:

```sh
go get github.com/aertje/gonice
go get github.com/aertje/semaphore
```

## Simple example

The following minimal example demonstrates how to use the `nice` package to create a scheduler that starts tasks based on their priority. It illustrates the required steps to create a scheduler, register a task with a specific priority, and signal the completion of the task.
The following minimal example demonstrates how to use the `semaphore` package to create a semaphore that starts tasks based on their priority. It illustrates the required steps to create a semaphore, register a task with a specific priority, and signal the completion of the task.

```go
package main
Expand All @@ -35,17 +35,17 @@ import (
"fmt"
"time"

"github.com/aertje/gonice/nice"
"github.com/aertje/semaphore/semaphore"
)

func main() {
// Create a new scheduler with the default maximum concurrency limit.
scheduler := nice.NewScheduler()
// Create a new prioritized semaphore with the default maximum concurrency limit.
s := semaphore.NewPrioritized()

// Register a task with the scheduler with a priority of 1.
fnDone := scheduler.Wait(1)
// Signal the completion of the task.
defer fnDone()
// Register a task with the semaphore with a priority of 1.
s.Acquire(1)
// Ensure signalling the completion of the task.
defer s.Release(1)

// Simulate a long-running task.
time.Sleep(1 * time.Second)
Expand All @@ -54,23 +54,23 @@ func main() {

The steps are as follows:

- Create a new scheduler with an optional maximum concurrency limit.
- Create a new semaphore with an optional maximum concurrency limit.

Then, for each task to be prioritised:

- Register a task with the scheduler using the `Wait` method. This will block until the task can be executed. It returns a function that should be called to signal the completion of the task.
- Register a task with the semaphore using the `Acquire` method. This will block until the task can be executed.
- Execute the task.
- Call the function returned from the call to `Wait` to signal the completion of the task to the scheduler.
- Call the `Release` method to signal the completion of the task to the semaphore.

Note the importance of calling the function returned by `Wait` to signal the completion of the task. This is necessary to allow other tasks to be executed by the scheduler.
Note the importance of calling the `Release` method to signal the completion of the task. This is necessary to allow other tasks to be executed by the semaphore.

If the context needs to be taken into account in order to support cancellation, the `WaitContext` method can be used instead.
If the context needs to be taken into account in order to support cancellation, the `AcquireContext` method can be used instead.

## Example use case: Prioritizing `/alive` endpoint

In this example, we will create a scheduler that prioritises an `/alive` endpoint over other endpoints. This is useful in scenarios where the `/alive` endpoint is critical and needs to be executed before other endpoints.
In this example, we will create a semaphore that prioritises an `/alive` endpoint over other endpoints. This is useful in scenarios where the `/alive` endpoint is critical and needs to be executed before other endpoints.

It also demonstrates use of the `WaitContext` method to support context cancellation. This is useful in scenarios where the client cancels the request, and the server should dispose of the task.
It also demonstrates use of the `AcquireContext` method to support context cancellation. This is useful in scenarios where the client cancels the request, and the server should dispose of the task.

```go
package main
Expand All @@ -81,16 +81,16 @@ import (
"net/http"
"time"

"github.com/aertje/gonice/nice"
"github.com/aertje/semaphore/semaphore"
)

func main() {
// Create a new scheduler with a maximum concurrency limit of 10.
scheduler := nice.NewScheduler(nice.WithMaxConcurrency(10))
// Create a new semaphore with a maximum concurrency limit of 10.
s := semaphore.NewPrioritized(semaphore.WithMaxConcurrency(10))

http.HandleFunc("/alive", func(w http.ResponseWriter, r *http.Request) {
// Register a task with the scheduler with a higher priority of 1.
fnDone, err := scheduler.WaitContext(r.Context(), 1)
// Register a task with the semaphore with a higher priority of 1.
err := s.AcquireContext(r.Context(), 1)
if err != nil {
if errors.Is(err, context.Canceled) {
http.Error(w, context.Cause(r.Context()).Error(), 499)
Expand All @@ -100,14 +100,14 @@ func main() {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer fnDone()
defer s.Release()

w.Write([]byte("I'm alive!"))
})

http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
// Register a task with the scheduler with a lower priority of 2.
fnDone, err := scheduler.WaitContext(r.Context(), 2)
// Register a task with the semaphore with a lower priority of 2.
err := s.AcquireContext(r.Context(), 2)
if err != nil {
if errors.Is(err, context.Canceled) {
http.Error(w, context.Cause(r.Context()).Error(), 499)
Expand All @@ -118,7 +118,7 @@ func main() {
return
}

defer fnDone()
defer s.Release()

time.Sleep(1 * time.Second)

Expand Down
20 changes: 20 additions & 0 deletions context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package context

import (
"context"

"github.com/aertje/semaphore/semaphore"
)

type schedulerKey struct{}

var key = schedulerKey{}

func SchedulerFromContext(ctx context.Context) (*semaphore.Prioritized, bool) {
s, ok := ctx.Value(key).(*semaphore.Prioritized)
return s, ok
}

func WithScheduler(ctx context.Context, s *semaphore.Prioritized) context.Context {
return context.WithValue(ctx, key, s)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/aertje/gonice
module github.com/aertje/semaphore

go 1.23.0

Expand Down
2 changes: 1 addition & 1 deletion queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"container/heap"
"testing"

"github.com/aertje/gonice/queue"
"github.com/aertje/semaphore/queue"
"github.com/stretchr/testify/assert"
)

Expand Down
25 changes: 12 additions & 13 deletions nice/nice.go → semaphore/semaphore.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package nice
package semaphore

import (
"container/heap"
"context"
"runtime"
"sync"

"github.com/aertje/gonice/queue"
"github.com/aertje/semaphore/queue"
)

type entry struct {
priority int
waitChan chan<- struct{}
cancelChan <-chan struct{}
}

type Scheduler struct {
type Prioritized struct {
maxConcurrency int

concurrency int
Expand All @@ -24,16 +23,16 @@ type Scheduler struct {
entries *queue.Q[entry]
}

type Option func(*Scheduler)
type Option func(*Prioritized)

func WithMaxConcurrency(maxConcurrency int) Option {
return func(p *Scheduler) {
return func(p *Prioritized) {
p.maxConcurrency = maxConcurrency
}
}

func NewScheduler(opts ...Option) *Scheduler {
s := &Scheduler{
func NewPrioritized(opts ...Option) *Prioritized {
s := &Prioritized{
maxConcurrency: runtime.GOMAXPROCS(0),
entries: new(queue.Q[entry]),
}
Expand All @@ -47,7 +46,7 @@ func NewScheduler(opts ...Option) *Scheduler {
return s
}

func (s *Scheduler) assessEntries() {
func (s *Prioritized) assessEntries() {
s.lock.Lock()
defer s.lock.Unlock()

Expand All @@ -72,7 +71,7 @@ func (s *Scheduler) assessEntries() {
}
}

func (s *Scheduler) WaitContext(ctx context.Context, priority int) error {
func (s *Prioritized) AcquireContext(ctx context.Context, priority int) error {
waitChan := make(chan struct{})
cancelChan := make(chan struct{})

Expand All @@ -98,11 +97,11 @@ func (s *Scheduler) WaitContext(ctx context.Context, priority int) error {
}
}

func (s *Scheduler) Wait(priority int) {
s.WaitContext(context.Background(), priority)
func (s *Prioritized) Acquire(priority int) {
s.AcquireContext(context.Background(), priority)
}

func (s *Scheduler) Done() {
func (s *Prioritized) Release() {
s.lock.Lock()
defer s.lock.Unlock()
s.concurrency--
Expand Down
30 changes: 15 additions & 15 deletions nice/nice_test.go → semaphore/semaphore_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package nice_test
package semaphore_test

import (
"context"
"sync"
"testing"
"time"

"github.com/aertje/gonice/nice"
"github.com/aertje/semaphore/semaphore"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSimple(t *testing.T) {
s := nice.NewScheduler()
s := semaphore.NewPrioritized()

s.Wait(1)
s.Done()
s.Acquire(1)
s.Release()
}

func TestOrderConcurrency(t *testing.T) {
Expand Down Expand Up @@ -52,14 +52,14 @@ func TestOrderConcurrency(t *testing.T) {
}

func testOrderForConcurrency(maxConcurrency int, totalTasks int) []int {
s := nice.NewScheduler(nice.WithMaxConcurrency(maxConcurrency))
s := semaphore.NewPrioritized(semaphore.WithMaxConcurrency(maxConcurrency))

// Saturate the scheduler otherwise subsequent tasks will be executed
// immediately in undefined order.
for i := 0; i < maxConcurrency; i++ {
go func() {
s.Wait(0)
defer s.Done()
s.Acquire(0)
defer s.Release()
time.Sleep(10 * time.Millisecond)
}()
}
Expand All @@ -78,8 +78,8 @@ func testOrderForConcurrency(maxConcurrency int, totalTasks int) []int {
go func() {
defer wg.Done()

s.Wait(priority)
defer s.Done()
s.Acquire(priority)
defer s.Release()

time.Sleep(10 * time.Millisecond)

Expand All @@ -96,14 +96,14 @@ func testOrderForConcurrency(maxConcurrency int, totalTasks int) []int {
}

func TestCancel(t *testing.T) {
s := nice.NewScheduler(nice.WithMaxConcurrency(1))
s := semaphore.NewPrioritized(semaphore.WithMaxConcurrency(1))

// Saturate the scheduler otherwise the task under test will be executed
// immediately without waiting.
go func() {
s.Wait(0)
s.Acquire(0)
time.Sleep(10 * time.Millisecond)
s.Done()
s.Release()
}()

// Give the scheduler some time to start the goroutine.
Expand All @@ -112,8 +112,8 @@ func TestCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()

err := s.WaitContext(ctx, 1)
defer s.Done()
err := s.AcquireContext(ctx, 1)
defer s.Release()

require.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
Expand Down

0 comments on commit 8134fd5

Please sign in to comment.