Skip to content

Commit

Permalink
test: add tmq test
Browse files Browse the repository at this point in the history
  • Loading branch information
huskar-t committed Dec 12, 2024
1 parent e979d6a commit 2eb8292
Showing 1 changed file with 140 additions and 108 deletions.
248 changes: 140 additions & 108 deletions controller/ws/tmq/tmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3156,56 +3156,56 @@ func TestTMQ_SetMsgConsumeExcluded(t *testing.T) {
assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message)
}

//func TestDropUser(t *testing.T) {
// defer doHttpSql("drop user test_tmq_drop_user")
// code, message := doHttpSql("create user test_tmq_drop_user pass 'pass_123'")
// assert.Equal(t, 0, code, message)
//
// dbName := "test_ws_tmq_drop_user"
// topic := "test_ws_tmq_drop_user_topic"
//
// before(t, dbName, topic)
//
// s := httptest.NewServer(router)
// defer s.Close()
// ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil)
// if err != nil {
// t.Error(err)
// return
// }
// defer func() {
// err = ws.Close()
// assert.NoError(t, err)
// }()
//
// defer func() {
// err = after(ws, dbName, topic)
// assert.NoError(t, err)
// }()
//
// // subscribe
// b, _ := json.Marshal(TMQSubscribeReq{
// User: "test_tmq_drop_user",
// Password: "pass_123",
// DB: dbName,
// GroupID: "test",
// Topics: []string{topic},
// AutoCommit: "false",
// OffsetReset: "earliest",
// })
// msg, err := doWebSocket(ws, TMQSubscribe, b)
// assert.NoError(t, err)
// var subscribeResp TMQSubscribeResp
// err = json.Unmarshal(msg, &subscribeResp)
// assert.NoError(t, err)
// assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message)
// // drop user
// code, message = doHttpSql("drop user test_tmq_drop_user")
// assert.Equal(t, 0, code, message)
// time.Sleep(time.Second * 3)
// resp, err := doWebSocket(ws, wstool.ClientVersion, nil)
// assert.Error(t, err, string(resp))
//}
func TestDropUser(t *testing.T) {
defer doHttpSql("drop user test_tmq_drop_user")
code, message := doHttpSql("create user test_tmq_drop_user pass 'pass_123'")
assert.Equal(t, 0, code, message)

dbName := "test_ws_tmq_drop_user"
topic := "test_ws_tmq_drop_user_topic"

before(t, dbName, topic)

s := httptest.NewServer(router)
defer s.Close()
ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil)
if err != nil {
t.Error(err)
return
}
defer func() {
err = ws.Close()
assert.NoError(t, err)
}()

defer func() {
err = after(ws, dbName, topic)
assert.NoError(t, err)
}()

// subscribe
b, _ := json.Marshal(TMQSubscribeReq{
User: "test_tmq_drop_user",
Password: "pass_123",
DB: dbName,
GroupID: "test",
Topics: []string{topic},
AutoCommit: "false",
OffsetReset: "earliest",
})
msg, err := doWebSocket(ws, TMQSubscribe, b)
assert.NoError(t, err)
var subscribeResp TMQSubscribeResp
err = json.Unmarshal(msg, &subscribeResp)
assert.NoError(t, err)
assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message)
// drop user
code, message = doHttpSql("drop user test_tmq_drop_user")
assert.Equal(t, 0, code, message)
time.Sleep(time.Second * 3)
resp, err := doWebSocket(ws, wstool.ClientVersion, nil)
assert.Error(t, err, string(resp))
}

//type httpQueryResp struct {
// Code int `json:"code,omitempty"`
Expand All @@ -3214,7 +3214,7 @@ func TestTMQ_SetMsgConsumeExcluded(t *testing.T) {
// Data [][]driver.Value `json:"data,omitempty"`
// Rows int `json:"rows,omitempty"`
//}

//
//func restQuery(sql string, db string) *httpQueryResp {
// w := httptest.NewRecorder()
// body := strings.NewReader(sql)
Expand All @@ -3238,60 +3238,92 @@ func TestTMQ_SetMsgConsumeExcluded(t *testing.T) {
// return &res
//}

// not supported yet
//func TestConnectionOptions(t *testing.T) {
// dbName := "test_ws_tmq_conn_options"
// topic := "test_ws_tmq_conn_options_topic"
//
// before(t, dbName, topic)
//
// s := httptest.NewServer(router)
// defer s.Close()
// ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil)
// if err != nil {
// t.Error(err)
// return
// }
// defer func() {
// err = ws.Close()
// assert.NoError(t, err)
// }()
//
// defer func() {
// err = after(ws, dbName, topic)
// assert.NoError(t, err)
// }()
//
// // subscribe
// b, _ := json.Marshal(TMQSubscribeReq{
// User: "root",
// Password: "taosdata",
// DB: dbName,
// GroupID: "test",
// Topics: []string{topic},
// AutoCommit: "false",
// OffsetReset: "earliest",
// SessionTimeoutMS: "100000",
// App: "tmq_test_conn_protocol",
// IP: "192.168.55.55",
// TZ: "Asia/Shanghai",
// })
// msg, err := doWebSocket(ws, TMQSubscribe, b)
// assert.NoError(t, err)
// var subscribeResp TMQSubscribeResp
// err = json.Unmarshal(msg, &subscribeResp)
// assert.NoError(t, err)
// assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message)
//
// // check connection options
// got := false
// for i := 0; i < 10; i++ {
// queryResp := restQuery("select conn_id from performance_schema.perf_connections where user_app = 'tmq_test_conn_protocol' and user_ip = '192.168.55.55'", "")
// if queryResp.Code == 0 && len(queryResp.Data) > 0 {
// got = true
// break
// }
// time.Sleep(time.Second)
// }
// assert.True(t, got)
//}
func TestConnectionOptions(t *testing.T) {
dbName := "test_ws_tmq_conn_options"
topic := "test_ws_tmq_conn_options_topic"

before(t, dbName, topic)

s := httptest.NewServer(router)
defer s.Close()
ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil)
if err != nil {
t.Error(err)
return
}
defer func() {
err = ws.Close()
assert.NoError(t, err)
}()

defer func() {
err = after(ws, dbName, topic)
assert.NoError(t, err)
}()

// subscribe
b, _ := json.Marshal(TMQSubscribeReq{
User: "root",
Password: "taosdata",
DB: dbName,
GroupID: "test",
Topics: []string{topic},
AutoCommit: "false",
OffsetReset: "earliest",
SessionTimeoutMS: "100000",
App: "tmq_test_conn_protocol",
IP: "192.168.55.55",
TZ: "Asia/Shanghai",
})
msg, err := doWebSocket(ws, TMQSubscribe, b)
assert.NoError(t, err)
var subscribeResp TMQSubscribeResp
err = json.Unmarshal(msg, &subscribeResp)
assert.NoError(t, err)
assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message)

// todo: check connection options, C not implemented
//got := false
//for i := 0; i < 10; i++ {
// queryResp := restQuery("select conn_id from performance_schema.perf_connections where user_app = 'tmq_test_conn_protocol' and user_ip = '192.168.55.55'", "")
// if queryResp.Code == 0 && len(queryResp.Data) > 0 {
// got = true
// break
// }
// time.Sleep(time.Second)
//}
//assert.True(t, got)
}

func TestWrongPass(t *testing.T) {
s := httptest.NewServer(router)
defer s.Close()
ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil)
if err != nil {
t.Error(err)
return
}
defer func() {
err = ws.Close()
assert.NoError(t, err)
}()
// subscribe
b, _ := json.Marshal(TMQSubscribeReq{
User: "root",
Password: "wrong_pass",
GroupID: "test",
Topics: []string{"test"},
AutoCommit: "false",
OffsetReset: "earliest",
SessionTimeoutMS: "100000",
App: "tmq_test_conn_protocol",
IP: "192.168.55.55",
TZ: "Asia/Shanghai",
})
msg, err := doWebSocket(ws, TMQSubscribe, b)
assert.NoError(t, err)
var subscribeResp TMQSubscribeResp
err = json.Unmarshal(msg, &subscribeResp)
assert.NoError(t, err)
assert.NotEqual(t, 0, subscribeResp.Code, subscribeResp.Message)
}

0 comments on commit 2eb8292

Please sign in to comment.