Skip to content

Commit

Permalink
Merge pull request #273 from taosdata/main
Browse files Browse the repository at this point in the history
merge main to 3.0
  • Loading branch information
huskar-t authored Apr 18, 2024
2 parents 6630a50 + 70b88e8 commit 56e0494
Show file tree
Hide file tree
Showing 18 changed files with 152 additions and 95 deletions.
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func TestInit(t *testing.T) {
IdleTimeout: 0,
},
Monitor: Monitor{
Disable: false,
Disable: true,
CollectDuration: 3 * time.Second,
DisableClientIP: false,
DisableClientIP: true,
InCGroup: false,
PauseQueryMemoryThreshold: 70,
PauseAllMemoryThreshold: 80,
Expand Down
4 changes: 2 additions & 2 deletions config/config_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func TestInit(t *testing.T) {
IdleTimeout: 0,
},
Monitor: Monitor{
Disable: false,
Disable: true,
CollectDuration: 3 * time.Second,
DisableClientIP: false,
DisableClientIP: true,
InCGroup: false,
PauseQueryMemoryThreshold: 70,
PauseAllMemoryThreshold: 80,
Expand Down
8 changes: 4 additions & 4 deletions config/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ type Monitor struct {
}

func initMonitor() {
viper.SetDefault("monitor.disable", false)
viper.SetDefault("monitor.disable", true)
_ = viper.BindEnv("monitor.disable", "TAOS_ADAPTER_MONITOR_DISABLE")
pflag.Bool("monitor.disable", false, `Whether to disable monitoring. Env "TAOS_ADAPTER_MONITOR_DISABLE"`)
pflag.Bool("monitor.disable", true, `Whether to disable monitoring. Env "TAOS_ADAPTER_MONITOR_DISABLE"`)

viper.SetDefault("monitor.collectDuration", time.Second*3)
_ = viper.BindEnv("monitor.collectDuration", "TAOS_ADAPTER_MONITOR_COLLECT_DURATION")
pflag.Duration("monitor.collectDuration", time.Second*3, `Set monitor duration. Env "TAOS_ADAPTER_MONITOR_COLLECT_DURATION"`)

viper.SetDefault("monitor.disableCollectClientIP", false)
viper.SetDefault("monitor.disableCollectClientIP", true)
_ = viper.BindEnv("monitor.disableCollectClientIP", "TAOS_ADAPTER_MONITOR_DISABLE_COLLECT_CLIENT_IP")
pflag.Bool("monitor.disableCollectClientIP", false, `Whether to disable collecting clientIP. Env "TAOS_ADAPTER_MONITOR_DISABLE_COLLECT_CLIENT_IP"`)
pflag.Bool("monitor.disableCollectClientIP", true, `Whether to disable collecting clientIP. Env "TAOS_ADAPTER_MONITOR_DISABLE_COLLECT_CLIENT_IP"`)

viper.SetDefault("monitor.incgroup", false)
_ = viper.BindEnv("monitor.incgroup", "TAOS_ADAPTER_MONITOR_INCGROUP")
Expand Down
11 changes: 6 additions & 5 deletions controller/rest/restful.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
Expand All @@ -29,6 +28,7 @@ import (
"github.com/taosdata/taosadapter/v3/tools"
"github.com/taosdata/taosadapter/v3/tools/csv"
"github.com/taosdata/taosadapter/v3/tools/ctools"
"github.com/taosdata/taosadapter/v3/tools/iptool"
"github.com/taosdata/taosadapter/v3/tools/jsonbuilder"
"github.com/taosdata/taosadapter/v3/tools/layout"
"github.com/taosdata/taosadapter/v3/tools/pool"
Expand Down Expand Up @@ -167,7 +167,7 @@ func DoQuery(c *gin.Context, db string, timeFunc ctools.FormatTimeFunc, reqID in
if isDebug {
s = time.Now()
}
taosConnect, err := commonpool.GetConnection(user, password, net.ParseIP(c.RemoteIP()))
taosConnect, err := commonpool.GetConnection(user, password, iptool.GetRealIP(c.Request))
if isDebug {
logger.Debugln("connect server cost:", time.Since(s))
}
Expand Down Expand Up @@ -500,13 +500,14 @@ func (ctl *Restful) upload(c *gin.Context) {
user := c.MustGet(UserKey).(string)
password := c.MustGet(PasswordKey).(string)
s := log.GetLogNow(isDebug)
taosConnect, err := commonpool.GetConnection(user, password, net.ParseIP(c.RemoteIP()))
ip := iptool.GetRealIP(c.Request)
taosConnect, err := commonpool.GetConnection(user, password, ip)
if isDebug {
logger.Debugln("connect cost:", log.GetLogDuration(isDebug, s))
}

if err != nil {
logger.WithError(err).Error("connect server error")
logger.WithField("ip", ip).WithError(err).Error("connect server error")
var tError *tErrors.TaosError
if errors.Is(err, commonpool.ErrWhitelistForbidden) {
ForbiddenResponse(c, commonpool.ErrWhitelistForbidden.Error())
Expand Down Expand Up @@ -674,7 +675,7 @@ func (ctl *Restful) des(c *gin.Context) {
BadRequestResponse(c, httperror.HTTP_GEN_TAOSD_TOKEN_ERR)
return
}
conn, err := commonpool.GetConnection(user, password, net.ParseIP(c.RemoteIP()))
conn, err := commonpool.GetConnection(user, password, iptool.GetRealIP(c.Request))
if err != nil {
if errors.Is(err, commonpool.ErrWhitelistForbidden) {
ForbiddenResponse(c, commonpool.ErrWhitelistForbidden.Error())
Expand Down
4 changes: 2 additions & 2 deletions controller/rest/table_vgid.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rest

import (
"errors"
"net"
"net/http"

"github.com/gin-gonic/gin"
Expand All @@ -11,6 +10,7 @@ import (
"github.com/taosdata/taosadapter/v3/db/commonpool"
"github.com/taosdata/taosadapter/v3/log"
"github.com/taosdata/taosadapter/v3/thread"
"github.com/taosdata/taosadapter/v3/tools/iptool"
)

func (ctl *Restful) tableVgID(c *gin.Context) {
Expand All @@ -30,7 +30,7 @@ func (ctl *Restful) tableVgID(c *gin.Context) {
password := c.MustGet(PasswordKey).(string)
isDebug := log.IsDebug()
s := log.GetLogNow(isDebug)
taosConn, err := commonpool.GetConnection(user, password, net.ParseIP(c.RemoteIP()))
taosConn, err := commonpool.GetConnection(user, password, iptool.GetRealIP(c.Request))
logger.Debugln("connect cost:", log.GetLogDuration(isDebug, s))
if err != nil {
logger.WithError(err).Error("connect server error")
Expand Down
11 changes: 5 additions & 6 deletions controller/ws/query/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"encoding/json"
"net"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/taosdata/taosadapter/v3/monitor"
"github.com/taosdata/taosadapter/v3/thread"
"github.com/taosdata/taosadapter/v3/tools"
"github.com/taosdata/taosadapter/v3/tools/iptool"
"github.com/taosdata/taosadapter/v3/tools/jsontype"
)

Expand Down Expand Up @@ -220,8 +220,7 @@ type Taos struct {
}

func NewTaos(session *melody.Session) *Taos {
host, _, _ := net.SplitHostPort(strings.TrimSpace(session.Request.RemoteAddr))
ipAddr := net.ParseIP(host)
ipAddr := iptool.GetRealIP(session.Request)
return &Taos{
Results: list.New(),
exit: make(chan struct{}, 1),
Expand All @@ -246,7 +245,7 @@ func (t *Taos) waitSignal() {
return
}
logger := wstool.GetLogger(t.session)
logger.WithField("clientIP", t.session.Request.RemoteAddr).Info("user dropped! close connection!")
logger.WithField("clientIP", t.ipStr).Info("user dropped! close connection!")
t.session.Close()
t.Unlock()
t.Close()
Expand All @@ -259,15 +258,15 @@ func (t *Taos) waitSignal() {
}
whitelist, err := tool.GetWhitelist(t.conn)
if err != nil {
wstool.GetLogger(t.session).WithField("clientIP", t.session.Request.RemoteAddr).WithError(err).Errorln("get whitelist error! close connection!")
wstool.GetLogger(t.session).WithField("clientIP", t.ipStr).WithError(err).Errorln("get whitelist error! close connection!")
t.session.Close()
t.Unlock()
t.Close()
return
}
valid := tool.CheckWhitelist(whitelist, t.ip)
if !valid {
wstool.GetLogger(t.session).WithField("clientIP", t.session.Request.RemoteAddr).Errorln("ip not in whitelist! close connection!")
wstool.GetLogger(t.session).WithField("clientIP", t.ipStr).Errorln("ip not in whitelist! close connection!")
t.session.Close()
t.Unlock()
t.Close()
Expand Down
13 changes: 7 additions & 6 deletions controller/ws/schemaless/schemaless.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"net"
"strings"
"sync"
"time"
"unsafe"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/taosdata/taosadapter/v3/db/tool"
"github.com/taosdata/taosadapter/v3/log"
"github.com/taosdata/taosadapter/v3/thread"
"github.com/taosdata/taosadapter/v3/tools/iptool"
)

type SchemalessController struct {
Expand Down Expand Up @@ -137,19 +137,20 @@ type TaosSchemaless struct {
dropUserNotify chan struct{}
session *melody.Session
ip net.IP
ipStr string
wg sync.WaitGroup
sync.Mutex
}

func NewTaosSchemaless(session *melody.Session) *TaosSchemaless {
host, _, _ := net.SplitHostPort(strings.TrimSpace(session.Request.RemoteAddr))
ipAddr := net.ParseIP(host)
ipAddr := iptool.GetRealIP(session.Request)
return &TaosSchemaless{
exit: make(chan struct{}),
whitelistChangeChan: make(chan int64, 1),
dropUserNotify: make(chan struct{}, 1),
session: session,
ip: ipAddr,
ipStr: ipAddr.String(),
}
}

Expand All @@ -166,7 +167,7 @@ func (t *TaosSchemaless) waitSignal() {
return
}
logger := wstool.GetLogger(t.session)
logger.WithField("clientIP", t.session.Request.RemoteAddr).Info("user dropped! close connection!")
logger.WithField("clientIP", t.ipStr).Info("user dropped! close connection!")
t.session.Close()
t.Unlock()
t.close()
Expand All @@ -179,14 +180,14 @@ func (t *TaosSchemaless) waitSignal() {
}
whitelist, err := tool.GetWhitelist(t.conn)
if err != nil {
wstool.GetLogger(t.session).WithField("clientIP", t.session.Request.RemoteAddr).WithError(err).Errorln("get whitelist error! close connection!")
wstool.GetLogger(t.session).WithField("clientIP", t.ipStr).WithError(err).Errorln("get whitelist error! close connection!")
t.session.Close()
t.Unlock()
return
}
valid := tool.CheckWhitelist(whitelist, t.ip)
if !valid {
wstool.GetLogger(t.session).WithField("clientIP", t.session.Request.RemoteAddr).Errorln("ip not in whitelist! close connection!")
wstool.GetLogger(t.session).WithField("clientIP", t.ipStr).Errorln("ip not in whitelist! close connection!")
t.session.Close()
t.Unlock()
t.close()
Expand Down
13 changes: 7 additions & 6 deletions controller/ws/stmt/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/taosdata/taosadapter/v3/log"
"github.com/taosdata/taosadapter/v3/thread"
"github.com/taosdata/taosadapter/v3/tools"
"github.com/taosdata/taosadapter/v3/tools/iptool"
)

type STMTController struct {
Expand Down Expand Up @@ -251,6 +251,7 @@ type TaosStmt struct {
dropUserNotify chan struct{}
session *melody.Session
ip net.IP
ipStr string
wg sync.WaitGroup
sync.Mutex
}
Expand All @@ -268,7 +269,7 @@ func (t *TaosStmt) waitSignal() {
return
}
logger := wstool.GetLogger(t.session)
logger.WithField("clientIP", t.session.Request.RemoteAddr).Info("user dropped! close connection!")
logger.WithField("clientIP", t.ipStr).Info("user dropped! close connection!")
t.session.Close()
t.Unlock()
t.Close()
Expand All @@ -281,15 +282,15 @@ func (t *TaosStmt) waitSignal() {
}
whitelist, err := tool.GetWhitelist(t.conn)
if err != nil {
wstool.GetLogger(t.session).WithField("clientIP", t.session.Request.RemoteAddr).WithError(err).Errorln("get whitelist error! close connection!")
wstool.GetLogger(t.session).WithField("clientIP", t.ipStr).WithError(err).Errorln("get whitelist error! close connection!")
t.session.Close()
t.Unlock()
t.Close()
return
}
valid := tool.CheckWhitelist(whitelist, t.ip)
if !valid {
wstool.GetLogger(t.session).WithField("clientIP", t.session.Request.RemoteAddr).Errorln("ip not in whitelist! close connection!")
wstool.GetLogger(t.session).WithField("clientIP", t.ipStr).Errorln("ip not in whitelist! close connection!")
t.session.Close()
t.Unlock()
t.Close()
Expand Down Expand Up @@ -319,15 +320,15 @@ func (s *StmtItem) clean() {
}

func NewTaosStmt(session *melody.Session) *TaosStmt {
host, _, _ := net.SplitHostPort(strings.TrimSpace(session.Request.RemoteAddr))
ipAddr := net.ParseIP(host)
ipAddr := iptool.GetRealIP(session.Request)
return &TaosStmt{
StmtList: list.New(),
exit: make(chan struct{}),
whitelistChangeChan: make(chan int64, 1),
dropUserNotify: make(chan struct{}, 1),
session: session,
ip: ipAddr,
ipStr: ipAddr.String(),
}
}

Expand Down
13 changes: 7 additions & 6 deletions controller/ws/tmq/tmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -29,6 +28,7 @@ import (
"github.com/taosdata/taosadapter/v3/db/tool"
"github.com/taosdata/taosadapter/v3/httperror"
"github.com/taosdata/taosadapter/v3/log"
"github.com/taosdata/taosadapter/v3/tools/iptool"
"github.com/taosdata/taosadapter/v3/tools/jsontype"
)

Expand Down Expand Up @@ -244,6 +244,7 @@ type TMQ struct {
whitelistChangeChan chan int64
session *melody.Session
ip net.IP
ipStr string
wg sync.WaitGroup
conn unsafe.Pointer
sync.Mutex
Expand All @@ -260,8 +261,7 @@ type Message struct {
}

func NewTaosTMQ(session *melody.Session) *TMQ {
host, _, _ := net.SplitHostPort(strings.TrimSpace(session.Request.RemoteAddr))
ipAddr := net.ParseIP(host)
ipAddr := iptool.GetRealIP(session.Request)
return &TMQ{
tmpMessage: &Message{buffer: &bytes.Buffer{}},
handler: tmqhandle.GlobalTMQHandlerPoll.Get(),
Expand All @@ -273,6 +273,7 @@ func NewTaosTMQ(session *melody.Session) *TMQ {
dropUserNotify: make(chan struct{}, 1),
session: session,
ip: ipAddr,
ipStr: ipAddr.String(),
}
}

Expand All @@ -289,7 +290,7 @@ func (t *TMQ) waitSignal() {
return
}
logger := wstool.GetLogger(t.session)
logger.WithField("clientIP", t.session.Request.RemoteAddr).Info("user dropped! close connection!")
logger.WithField("clientIP", t.ipStr).Info("user dropped! close connection!")
t.session.Close()
t.Unlock()
t.Close(logger)
Expand All @@ -303,7 +304,7 @@ func (t *TMQ) waitSignal() {
whitelist, err := tool.GetWhitelist(t.conn)
if err != nil {
logger := wstool.GetLogger(t.session)
logger.WithField("clientIP", t.session.Request.RemoteAddr).WithError(err).Errorln("get whitelist error! close connection!")
logger.WithField("clientIP", t.ipStr).WithError(err).Errorln("get whitelist error! close connection!")
t.session.Close()
t.Unlock()
t.Close(logger)
Expand All @@ -312,7 +313,7 @@ func (t *TMQ) waitSignal() {
valid := tool.CheckWhitelist(whitelist, t.ip)
if !valid {
logger := wstool.GetLogger(t.session)
logger.WithField("clientIP", t.session.Request.RemoteAddr).Errorln("ip not in whitelist! close connection!")
logger.WithField("clientIP", t.ipStr).Errorln("ip not in whitelist! close connection!")
t.session.Close()
t.Unlock()
t.Close(logger)
Expand Down
Loading

0 comments on commit 56e0494

Please sign in to comment.