Skip to content
This repository has been archived by the owner on Dec 19, 2023. It is now read-only.

Commit

Permalink
支持分布式
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyj committed Jan 16, 2018
1 parent c702388 commit 02f8a9a
Show file tree
Hide file tree
Showing 16 changed files with 1,286 additions and 767 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ server:
password: admin
addr: :8083
name: 10.10.99.177
master: ""
notifications:
dingtalk:
groups:
Expand All @@ -112,6 +113,7 @@ client:
server_url: http://admin:admin@localhost:8083
```

master 的支持来自 https://github.com/ihaiker/distributed-gosuv
Logs can be found in `$HOME/.gosuv/log/`

Edit config file(default located in `$HOME/.gosuv/programs.yml`) and run `gosuv reload` will take effect immediately.
Expand Down
39 changes: 39 additions & 0 deletions atomic64.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import "sync/atomic"

type Int64 struct {
v int64
}

func (a *Int64) Get() int64 {
return atomic.LoadInt64(&a.v)
}

func (a *Int64) Set(v int64) {
atomic.StoreInt64(&a.v, v)
}

func (a *Int64) CompareAndSwap(o, n int64) bool {
return atomic.CompareAndSwapInt64(&a.v, o, n)
}

func (a *Int64) Swap(v int64) int64 {
return atomic.SwapInt64(&a.v, v)
}

func (a *Int64) Add(v int64) int64 {
return atomic.AddInt64(&a.v, v)
}

func (a *Int64) Sub(v int64) int64 {
return a.Add(-v)
}

func (a *Int64) Incr() int64 {
return a.Add(1)
}

func (a *Int64) Decr() int64 {
return a.Add(-1)
}
40 changes: 40 additions & 0 deletions buffer_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"bytes"
)

// BufferPool implements a pool of bytes.Buffers in the form of a bounded
// channel.
type BufferPool struct {
c chan *bytes.Buffer
}

// NewBufferPool creates a new BufferPool bounded to the given size.
func NewBufferPool(size int) (bp *BufferPool) {
return &BufferPool{
c: make(chan *bytes.Buffer, size),
}
}

// Get gets a Buffer from the BufferPool, or creates a new one if none are
// available in the pool.
func (bp *BufferPool) Get() (b *bytes.Buffer) {
select {
case b = <-bp.c:
// reuse existing buffer
default:
// create new buffer
b = bytes.NewBuffer([]byte{})
}
return
}

// Put returns the given Buffer to the BufferPool.
func (bp *BufferPool) Put(b *bytes.Buffer) {
b.Reset()
select {
case bp.c <- b:
default: // Discard the buffer if the pool is full.
}
}
7 changes: 7 additions & 0 deletions cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ func actionStartServer(c *cli.Context) error {
if err != nil {
log.Fatal(err)
}

if c.Bool("foreground") {
if err = newDistributed(suv, hdlr); err != nil {
log.Fatal(err)
}
}

auth := cfg.Server.HttpAuth
if auth.Enabled {
hdlr = httpauth.SimpleBasicAuth(auth.User, auth.Password)(hdlr)
Expand Down
5 changes: 3 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ type Configuration struct {
User string `yaml:"username"`
Password string `yaml:"password"`
} `yaml:"httpauth"`
Addr string `yaml:"addr"`
Name string `yaml:"name"`
Addr string `yaml:"addr"`
Name string `yaml:"name"`
Master string `yaml:"master"`
} `yaml:"server,omitempty"`
Notifications Notifications `yaml:"notifications,omitempty" json:"-"`

Expand Down
241 changes: 241 additions & 0 deletions distributed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
分布式实现
话外:作者的整体代码结构对于我的分布式修改太合适了
*/
package main

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/bluele/gcache"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/qiniu/log"
)

type Cluster struct {
slaves gcache.Cache
client *http.Client
suv *Supervisor
}

func (cluster *Cluster) join() {
data := url.Values{"slave": []string{cfg.Server.Addr}}
request, err := http.NewRequest(http.MethodPost, "http://"+cfg.Server.Master+"/distributed/join", strings.NewReader(data.Encode()))
request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
request.Header.Add("Content-Length", strconv.Itoa(len(data.Encode())))

if err != nil {
log.Errorf("join cluster %s : %s", cfg.Server.Master, err)
return
}
cluster.auth(request)
resp, err := cluster.client.Do(request)
if err != nil {
log.Errorf("join cluster %s : %s", cfg.Server.Master, err)
return
}
if resp.StatusCode == http.StatusOK {
log.Debugf("join to master %s", cfg.Server.Master)
} else {
log.Debugf("join to master %s error: %d", cfg.Server.Master, resp.StatusCode)
}
}

func (cluster *Cluster) auth(request *http.Request) {
if cfg.Server.HttpAuth.Enabled {
request.SetBasicAuth(cfg.Server.HttpAuth.User, cfg.Server.HttpAuth.Password)
}
}

func (cluster *Cluster) dialWebSocket(wsUrl string) (*websocket.Conn, *http.Response, error) {
var dialer *websocket.Dialer
if cfg.Server.HttpAuth.Enabled {
dialer = &websocket.Dialer{Proxy: func(r *http.Request) (*url.URL, error) {
cluster.auth(r)
return websocket.DefaultDialer.Proxy(r)
}}
} else {
dialer = websocket.DefaultDialer
}
return dialer.Dial(wsUrl, nil)
}

func (cluster *Cluster) requestSlave(url, method string, bodyBuffer *bytes.Buffer) ([]byte, error) {
request, err := http.NewRequest(method, url, nil)
if err != nil {
return nil, err
}
cluster.auth(request)
resp, err := cluster.client.Do(request)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}

func (cluster *Cluster) cmdJoinCluster(w http.ResponseWriter, r *http.Request) {
slave := r.PostFormValue("slave")
if slave == "" {
w.WriteHeader(http.StatusForbidden)
return
}
if strings.HasPrefix(slave, ":") {
idx := strings.LastIndex(r.RemoteAddr, ":")
slave = r.RemoteAddr[:idx] + slave
}
log.Debugf("%s join cluster.", slave)
if out, err := cluster.slaves.Get(slave); err != nil || out == nil {
cluster.suv.broadcastEvent("new slave : " + slave)
}
cluster.slaves.Set(slave, slave)
w.WriteHeader(http.StatusOK)
}

//获取分布式系统下所有的内容
func (cluster *Cluster) cmdQueryDistributedPrograms(w http.ResponseWriter, r *http.Request) {
jsonOut := "{"
idx := 0
for _, v := range cluster.slaves.GetALL() {
slave := v.(string)
reqUrl := fmt.Sprintf("http://%s/api/programs", slave)
if body, err := cluster.requestSlave(reqUrl, http.MethodGet, nil); err == nil {
jsonOut += fmt.Sprintf("\"%s\":%s", slave, body)
}
if idx < cluster.slaves.Len()-1 {
jsonOut += ","
}
idx += 1
}
jsonOut += "}"
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(jsonOut))
}

func (cluster *Cluster) cmdSetting(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
slave := mux.Vars(r)["slave"]
cluster.suv.renderHTML(w, "setting", map[string]string{
"Name": name,
"Slave": slave,
})
}

func (cluster *Cluster) cmdWebSocketProxy(w http.ResponseWriter, r *http.Request) {
sock, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error("upgrade:", err)
return
}
defer sock.Close()

slave := mux.Vars(r)["slave"]
slaveUri := strings.Replace(r.RequestURI, "/distributed/"+slave, "", 1)
wsUrl := fmt.Sprintf("ws://%s%s", slave, slaveUri)
log.Infof("proxy websocket :%s", wsUrl)

ws, _, err := cluster.dialWebSocket(wsUrl)
if err != nil {
log.Error("dial:", err)
return
}
defer ws.Close()

for {
messageType, data, err := ws.ReadMessage()
if err != nil {
log.Error("read message:", err)
return
}
if messageType == websocket.CloseMessage {
log.Infof("close socket")
return
}
w, err := sock.NextWriter(messageType)
if err != nil {
log.Error("write err:", err)
return
}
_, err = w.Write(data)
if err != nil {
log.Error("read:", err)
return
}
}
}

func (cluster *Cluster) slaveHttpProxy(w http.ResponseWriter, r *http.Request) {
slave := mux.Vars(r)["slave"]
slaveUri := strings.Replace(r.RequestURI, "/distributed/"+slave, "", 1)
requestUrl := fmt.Sprintf("http://%s%s", slave, slaveUri)
log.Infof("proxy :%s %s", r.Method, requestUrl)

request, err := http.NewRequest(r.Method, requestUrl, r.Body)
for k, v := range r.Header {
request.Header.Set(k, strings.Join(v, ","))
}
if err != nil {
log.Error(err)
}
cluster.auth(request)
resp, err := cluster.client.Do(request)
if err != nil {
log.Error(err)
}
defer resp.Body.Close()

if body, err := ioutil.ReadAll(resp.Body); err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
} else {
for k, v := range resp.Header {
w.Header().Set(k, strings.Join(v, ","))
}
w.Write(body)
cluster.suv.broadcastEvent("execute ok : " + slaveUri)
}
}

func newDistributed(suv *Supervisor, hdlr http.Handler) error {
cluster.suv = suv

r := hdlr.(*mux.Router)
r.HandleFunc("/distributed/join", cluster.cmdJoinCluster).Methods("POST")
r.HandleFunc("/distributed/api/programs", cluster.cmdQueryDistributedPrograms).Methods("GET")
r.HandleFunc("/distributed/{slave}/settings/{name}", cluster.cmdSetting)
for _, path := range []string{
"/distributed/{slave}/api/programs", "/distributed/{slave}/api/programs/{name}",
"/distributed/{slave}/api/programs/{name}/start", "/distributed/{slave}/api/programs/{name}/stop",
} {
r.HandleFunc(path, cluster.slaveHttpProxy)
}
r.HandleFunc("/distributed/{slave}/ws/logs/{name}", cluster.cmdWebSocketProxy)
r.HandleFunc("/distributed/{slave}/ws/perfs/{name}", cluster.cmdWebSocketProxy)

if cfg.Server.Master != "" {
go func() {
t1 := time.NewTimer(time.Second)
for {
select {
case <-t1.C:
cluster.join()
t1.Reset(time.Second)
}
}
}()
}
return nil
}

var cluster = Cluster{
slaves: gcache.New(10).LRU().Expiration(time.Second * 3).Build(),
client: new(http.Client),
}
Loading

0 comments on commit 02f8a9a

Please sign in to comment.