Skip to content

Commit

Permalink
Merge pull request #1 from xiaoenai/v2
Browse files Browse the repository at this point in the history
V2
  • Loading branch information
swxctx authored Jun 29, 2020
2 parents ac24e84 + cebeaf1 commit 7436a9f
Show file tree
Hide file tree
Showing 11 changed files with 520 additions and 1 deletion.
16 changes: 16 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/

go.sum
53 changes: 52 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,53 @@
# goredis
Golang Redis tools, Build based on https://github.com/go-redis/redis
[![GoDoc](http://godoc.org/github.com/xiaoenai/goredis?status.svg)](http://godoc.org/github.com/xiaoenai/goredis)

Golang Redis tools, Build based on `github.com/go-redis/redis/v7` v7.0.0-beta.4

## Example

```
package main
import (
"log"
"time"
"github.com/xiaoenai/goredis/v2"
)
func main() {
cfg, err := redis.ReadConfig("test_redis")
if err != nil {
log.Fatalf("redis.ReadConfig(\"test_redis\"): %v", err)
}
c, err := redis.NewClient(cfg)
if err != nil {
log.Fatalf("redis.NewClient(\"test_redis\"): %v", err)
}
m := redis.NewModule("test", "v1.0")
s, err := c.Set(m.Key("a_key"), "a_value", time.Second).Result()
if err != nil {
log.Fatalf("c.Set().Result() error: %v", err)
}
log.Printf("c.Set().Result() result: %s", s)
s, err = c.Get(m.Key("a_key")).Result()
if err != nil {
log.Fatalf("c.Get().Result() error: %v", err)
}
log.Printf("c.Get().Result() result: %s", s)
time.Sleep(2 * time.Second)
s, err = c.Get(m.Key("a_key")).Result()
if err == nil {
log.Fatalf("[after 2s] c.Get().Result() result: %s", s)
}
log.Printf("[after 2s] c.Get().Result() error: %s", err)
}
```

## API doc

[http://godoc.org/gopkg.in/go-redis/redis.v7](http://godoc.org/gopkg.in/go-redis/redis.v7)
145 changes: 145 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package goredis

import (
"fmt"
"time"

"github.com/go-redis/redis/v7"
)

// Client redis (cluster) client.
type (
Client struct {
cfg *Config
Cmdable
}
Cmdable interface {
redis.Cmdable
Subscribe(channels ...string) *redis.PubSub
// tx watch
Watch(fn func(*redis.Tx) error, keys ...string) error
}
// Alias
PubSub = redis.PubSub
Message = redis.Message
GeoLocation = redis.GeoLocation
GeoRadiusQuery = redis.GeoRadiusQuery
ZRangeBy = redis.ZRangeBy
Z = redis.Z
Pipeliner = redis.Pipeliner
RedisCmdable = redis.Cmdable
SliceCmd = redis.SliceCmd
StatusCmd = redis.StatusCmd
Cmder = redis.Cmder
IntCmd = redis.IntCmd
DurationCmd = redis.DurationCmd
BoolCmd = redis.BoolCmd
StringCmd = redis.StringCmd
FloatCmd = redis.FloatCmd
StringSliceCmd = redis.StringSliceCmd
BoolSliceCmd = redis.BoolSliceCmd
StringStringMapCmd = redis.StringStringMapCmd
StringIntMapCmd = redis.StringIntMapCmd
ZSliceCmd = redis.ZSliceCmd
ScanCmd = redis.ScanCmd
ClusterSlotsCmd = redis.ClusterSlotsCmd
)

// NewClient creates a redis(cluster) client from yaml config, and pings the client.
func NewClient(cfg *Config) (*Client, error) {
var c = &Client{
cfg: cfg,
}
switch cfg.DeployType {
case TypeSingle:
c.Cmdable = redis.NewClient(&redis.Options{
Addr: cfg.ForSingle.Addr,
Password: cfg.Password,
MaxRetries: cfg.MaxRetries,
MaxRetryBackoff: time.Duration(cfg.ForSingle.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(cfg.DialTimeout) * time.Second,
ReadTimeout: time.Duration(cfg.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(cfg.WriteTimeout) * time.Second,
PoolSize: cfg.PoolSizePerNode,
PoolTimeout: time.Duration(cfg.PoolTimeout) * time.Second,
IdleTimeout: time.Duration(cfg.IdleTimeout) * time.Second,
IdleCheckFrequency: time.Duration(cfg.IdleCheckFrequency) * time.Second,
})

case TypeCluster:
c.Cmdable = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: cfg.ForCluster.Addrs,
MaxRedirects: cfg.ForCluster.MaxRedirects,
ReadOnly: cfg.ReadOnly,
RouteByLatency: cfg.ForCluster.RouteByLatency,
Password: cfg.Password,
MaxRetries: cfg.MaxRetries,
DialTimeout: time.Duration(cfg.DialTimeout) * time.Second,
ReadTimeout: time.Duration(cfg.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(cfg.WriteTimeout) * time.Second,
PoolSize: cfg.PoolSizePerNode,
PoolTimeout: time.Duration(cfg.PoolTimeout) * time.Second,
IdleTimeout: time.Duration(cfg.IdleTimeout) * time.Second,
IdleCheckFrequency: time.Duration(cfg.IdleCheckFrequency) * time.Second,
})

default:
return nil, fmt.Errorf("redis.Config.DeployType: optional enumeration list: %s, %s", TypeSingle, TypeCluster)
}

if _, err := c.Ping().Result(); err != nil {
return nil, err
}
return c, nil
}

// Config returns config.
func (c *Client) Config() *Config {
return c.cfg
}

// IsCluster returns whether it is a cluster.
func (c *Client) IsCluster() bool {
return c.cfg.DeployType == TypeCluster
}

// ToSingle tries to convert it to *redis.Client.
func (c *Client) ToSingle() (*redis.Client, bool) {
cli, ok := c.Cmdable.(*redis.Client)
return cli, ok
}

// ToCluster tries to convert it to *redis.ClusterClient.
func (c *Client) ToCluster() (*redis.ClusterClient, bool) {
clu, ok := c.Cmdable.(*redis.ClusterClient)
return clu, ok
}

// LockCallback 使用分布式锁执行回调函数
// 注意:每10毫秒尝试1次上锁,且上锁后默认锁定1分钟
func (c *Client) LockCallback(lockKey string, callback func(), maxLock ...time.Duration) error {
var d = time.Minute
if len(maxLock) > 0 {
d = maxLock[0]
}
// lock
for lockOk, err := c.SetNX(lockKey, "", d).Result(); !lockOk; lockOk, err = c.SetNX(lockKey, "", d).Result() {
if err != nil && !IsRedisNil(err) {
return err
}
time.Sleep(time.Millisecond * 10)
}
// unlock
defer c.Del(lockKey)
// do
callback()
return nil
}

// Redis nil reply, .e.g. when key does not exist.
const Nil = redis.Nil

// IsRedisNil Is the redis nil reply? .e.g. when key does not exist.
func IsRedisNil(err error) bool {
return redis.Nil == err
}
40 changes: 40 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package goredis

import (
"testing"
"time"

"github.com/go-redis/redis/v7"
)

func TestClient(t *testing.T) {
cfg, err := ReadConfig("test_redis")
if err != nil {
t.Fatal("ReadConfig(\"test_redis\")", err)
}
c, err := NewClient(cfg)
if err != nil {
t.Fatal("NewClient(\"test_redis\")", err)
}

m := NewModule("test")

s, err := c.Set(m.Key("a_key"), "a_value", time.Second).Result()
if err != nil {
t.Fatal("c.Set().Result() error:", err)
}
t.Logf("c.Set().Result() result: %s", s)

s, err = c.Get(m.Key("a_key")).Result()
if err != nil {
t.Fatal("c.Get().Result() error:", err)
}
t.Logf("c.Get().Result() result: %s", s)
time.Sleep(2 * time.Second)

s, err = c.Get(m.Key("a_key")).Result()
if err == nil {
t.Fatalf("[after 2s] c.Get().Result() result: %s", s)
}
t.Logf("[after 2s] c.Get().Result() is null ?: %v", err == redis.Nil)
}
129 changes: 129 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package goredis

import (
"fmt"

"github.com/henrylee2cn/cfgo"
)

type (
// Config redis (cluster) client config
Config struct {
// redis deploy type, [single, cluster]
DeployType string `yaml:"deploy_type"`
// only for single node config, valid when DeployType=single.
ForSingle SingleConfig `yaml:"for_single"`
// only for cluster config, valid when DeployType=cluster.
ForCluster ClusterConfig `yaml:"for_cluster"`

// An optional password. Must match the password specified in the
// requirepass server configuration option.
Password string `yaml:"password,omitempty"`

// The maximum number of retries before giving up.
// Default is to not retry failed commands.
MaxRetries int `yaml:"max_retries,omitempty"`

// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout int64 `yaml:"dial_timeout,omitempty"`
// Timeout for socket reads. If reached, commands will fail
// with a timeout instead of blocking.
// Default is 3 seconds.
ReadTimeout int64 `yaml:"read_timeout,omitempty"`
// Timeout for socket writes. If reached, commands will fail
// with a timeout instead of blocking.
// Default is ReadTimeout.
WriteTimeout int64 `yaml:"write_timeout,omitempty"`

// PoolSizePerNode applies per cluster node and not for the whole cluster.
// Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSizePerNode int `yaml:"pool_size_per_node"`
// Amount of time client waits for connection if all connections
// are busy before returning an error.
// Default is ReadTimeout + 1 second.
PoolTimeout int64 `yaml:"pool_timeout,omitempty"`
// Amount of time after which client closes idle connections.
// Should be less than server's timeout.
// Default is 300 seconds.
IdleTimeout int64 `yaml:"idle_timeout"`
// Frequency of idle checks.
// Default is 60 seconds.
// When minus value is set, then idle check is disabled.
IdleCheckFrequency int64 `yaml:"idle_check_frequency,omitempty"`

// Enables read only queries on slave nodes.
// Only for cluster.
ReadOnly bool `yaml:"read_only,omitempty"`

init bool
}

// SingleConfig redis single node client config.
SingleConfig struct {
// host:port address.
Addr string `yaml:"addr"`

// Maximum backoff between each retry.
// Default is 512 seconds; -1 disables backoff.
MaxRetryBackoff int64 `yaml:"max_retry_backoff,omitempty"`
}

// ClusterConfig redis cluster client config.
ClusterConfig struct {
// A seed list of host:port addresses of cluster nodes.
Addrs []string `yaml:"addrs"`

// The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects.
// Default is 16.
MaxRedirects int `yaml:"max_redirects,omitempty"`

// Enables routing read-only queries to the closest master or slave node.
RouteByLatency bool `yaml:"route_by_latency,omitempty"`
}
)

// deploy types
const (
TypeSingle = "single"
TypeCluster = "cluster"
)

// Reload reloads config.
func (cfg *Config) Reload(bind cfgo.BindFunc) error {
if cfg.init {
return nil
}
err := bind()
if err != nil {
return err
}
cfg.init = true
if cfg.DeployType != TypeSingle && cfg.DeployType != TypeCluster {
return fmt.Errorf("redis config: deploy_type optional enumeration list: %s, %s", TypeSingle, TypeCluster)
}
return nil
}

// ReadConfig read config from specified yaml section.
func ReadConfig(configSection string) (*Config, error) {
var cfg = NewConfig()
var err error
if cfgo.IsReg(configSection) {
err = cfgo.BindSection(configSection, cfg)
} else {
err = cfgo.Reg(configSection, cfg)
}
return cfg, err
}

// NewConfig creates a default config.
func NewConfig() *Config {
return &Config{
DeployType: TypeSingle,
ForSingle: SingleConfig{Addr: "127.0.0.1:6379"},
// ForCluster: ClusterConfig{Addrs: []string{"127.0.0.1:6379"}},
}
}
8 changes: 8 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
test_redis:
deploy_type: single
for_single:
addr: 127.0.0.1:6379
for_cluster:
addrs: []
pool_size_per_node: 0
idle_timeout: 0
Loading

0 comments on commit 7436a9f

Please sign in to comment.