From 2eb829246af6658617219922c45d995f77742106 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Thu, 12 Dec 2024 13:52:24 +0800 Subject: [PATCH] test: add tmq test --- controller/ws/tmq/tmq_test.go | 248 +++++++++++++++++++--------------- 1 file changed, 140 insertions(+), 108 deletions(-) diff --git a/controller/ws/tmq/tmq_test.go b/controller/ws/tmq/tmq_test.go index 47e26de..e8f10ab 100644 --- a/controller/ws/tmq/tmq_test.go +++ b/controller/ws/tmq/tmq_test.go @@ -3156,56 +3156,56 @@ func TestTMQ_SetMsgConsumeExcluded(t *testing.T) { assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message) } -//func TestDropUser(t *testing.T) { -// defer doHttpSql("drop user test_tmq_drop_user") -// code, message := doHttpSql("create user test_tmq_drop_user pass 'pass_123'") -// assert.Equal(t, 0, code, message) -// -// dbName := "test_ws_tmq_drop_user" -// topic := "test_ws_tmq_drop_user_topic" -// -// before(t, dbName, topic) -// -// s := httptest.NewServer(router) -// defer s.Close() -// ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil) -// if err != nil { -// t.Error(err) -// return -// } -// defer func() { -// err = ws.Close() -// assert.NoError(t, err) -// }() -// -// defer func() { -// err = after(ws, dbName, topic) -// assert.NoError(t, err) -// }() -// -// // subscribe -// b, _ := json.Marshal(TMQSubscribeReq{ -// User: "test_tmq_drop_user", -// Password: "pass_123", -// DB: dbName, -// GroupID: "test", -// Topics: []string{topic}, -// AutoCommit: "false", -// OffsetReset: "earliest", -// }) -// msg, err := doWebSocket(ws, TMQSubscribe, b) -// assert.NoError(t, err) -// var subscribeResp TMQSubscribeResp -// err = json.Unmarshal(msg, &subscribeResp) -// assert.NoError(t, err) -// assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message) -// // drop user -// code, message = doHttpSql("drop user test_tmq_drop_user") -// assert.Equal(t, 0, code, message) -// time.Sleep(time.Second * 3) -// resp, err := doWebSocket(ws, wstool.ClientVersion, nil) -// assert.Error(t, err, string(resp)) -//} +func TestDropUser(t *testing.T) { + defer doHttpSql("drop user test_tmq_drop_user") + code, message := doHttpSql("create user test_tmq_drop_user pass 'pass_123'") + assert.Equal(t, 0, code, message) + + dbName := "test_ws_tmq_drop_user" + topic := "test_ws_tmq_drop_user_topic" + + before(t, dbName, topic) + + s := httptest.NewServer(router) + defer s.Close() + ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil) + if err != nil { + t.Error(err) + return + } + defer func() { + err = ws.Close() + assert.NoError(t, err) + }() + + defer func() { + err = after(ws, dbName, topic) + assert.NoError(t, err) + }() + + // subscribe + b, _ := json.Marshal(TMQSubscribeReq{ + User: "test_tmq_drop_user", + Password: "pass_123", + DB: dbName, + GroupID: "test", + Topics: []string{topic}, + AutoCommit: "false", + OffsetReset: "earliest", + }) + msg, err := doWebSocket(ws, TMQSubscribe, b) + assert.NoError(t, err) + var subscribeResp TMQSubscribeResp + err = json.Unmarshal(msg, &subscribeResp) + assert.NoError(t, err) + assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message) + // drop user + code, message = doHttpSql("drop user test_tmq_drop_user") + assert.Equal(t, 0, code, message) + time.Sleep(time.Second * 3) + resp, err := doWebSocket(ws, wstool.ClientVersion, nil) + assert.Error(t, err, string(resp)) +} //type httpQueryResp struct { // Code int `json:"code,omitempty"` @@ -3214,7 +3214,7 @@ func TestTMQ_SetMsgConsumeExcluded(t *testing.T) { // Data [][]driver.Value `json:"data,omitempty"` // Rows int `json:"rows,omitempty"` //} - +// //func restQuery(sql string, db string) *httpQueryResp { // w := httptest.NewRecorder() // body := strings.NewReader(sql) @@ -3238,60 +3238,92 @@ func TestTMQ_SetMsgConsumeExcluded(t *testing.T) { // return &res //} -// not supported yet -//func TestConnectionOptions(t *testing.T) { -// dbName := "test_ws_tmq_conn_options" -// topic := "test_ws_tmq_conn_options_topic" -// -// before(t, dbName, topic) -// -// s := httptest.NewServer(router) -// defer s.Close() -// ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil) -// if err != nil { -// t.Error(err) -// return -// } -// defer func() { -// err = ws.Close() -// assert.NoError(t, err) -// }() -// -// defer func() { -// err = after(ws, dbName, topic) -// assert.NoError(t, err) -// }() -// -// // subscribe -// b, _ := json.Marshal(TMQSubscribeReq{ -// User: "root", -// Password: "taosdata", -// DB: dbName, -// GroupID: "test", -// Topics: []string{topic}, -// AutoCommit: "false", -// OffsetReset: "earliest", -// SessionTimeoutMS: "100000", -// App: "tmq_test_conn_protocol", -// IP: "192.168.55.55", -// TZ: "Asia/Shanghai", -// }) -// msg, err := doWebSocket(ws, TMQSubscribe, b) -// assert.NoError(t, err) -// var subscribeResp TMQSubscribeResp -// err = json.Unmarshal(msg, &subscribeResp) -// assert.NoError(t, err) -// assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message) -// -// // check connection options -// got := false -// for i := 0; i < 10; i++ { -// queryResp := restQuery("select conn_id from performance_schema.perf_connections where user_app = 'tmq_test_conn_protocol' and user_ip = '192.168.55.55'", "") -// if queryResp.Code == 0 && len(queryResp.Data) > 0 { -// got = true -// break -// } -// time.Sleep(time.Second) -// } -// assert.True(t, got) -//} +func TestConnectionOptions(t *testing.T) { + dbName := "test_ws_tmq_conn_options" + topic := "test_ws_tmq_conn_options_topic" + + before(t, dbName, topic) + + s := httptest.NewServer(router) + defer s.Close() + ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil) + if err != nil { + t.Error(err) + return + } + defer func() { + err = ws.Close() + assert.NoError(t, err) + }() + + defer func() { + err = after(ws, dbName, topic) + assert.NoError(t, err) + }() + + // subscribe + b, _ := json.Marshal(TMQSubscribeReq{ + User: "root", + Password: "taosdata", + DB: dbName, + GroupID: "test", + Topics: []string{topic}, + AutoCommit: "false", + OffsetReset: "earliest", + SessionTimeoutMS: "100000", + App: "tmq_test_conn_protocol", + IP: "192.168.55.55", + TZ: "Asia/Shanghai", + }) + msg, err := doWebSocket(ws, TMQSubscribe, b) + assert.NoError(t, err) + var subscribeResp TMQSubscribeResp + err = json.Unmarshal(msg, &subscribeResp) + assert.NoError(t, err) + assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message) + + // todo: check connection options, C not implemented + //got := false + //for i := 0; i < 10; i++ { + // queryResp := restQuery("select conn_id from performance_schema.perf_connections where user_app = 'tmq_test_conn_protocol' and user_ip = '192.168.55.55'", "") + // if queryResp.Code == 0 && len(queryResp.Data) > 0 { + // got = true + // break + // } + // time.Sleep(time.Second) + //} + //assert.True(t, got) +} + +func TestWrongPass(t *testing.T) { + s := httptest.NewServer(router) + defer s.Close() + ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil) + if err != nil { + t.Error(err) + return + } + defer func() { + err = ws.Close() + assert.NoError(t, err) + }() + // subscribe + b, _ := json.Marshal(TMQSubscribeReq{ + User: "root", + Password: "wrong_pass", + GroupID: "test", + Topics: []string{"test"}, + AutoCommit: "false", + OffsetReset: "earliest", + SessionTimeoutMS: "100000", + App: "tmq_test_conn_protocol", + IP: "192.168.55.55", + TZ: "Asia/Shanghai", + }) + msg, err := doWebSocket(ws, TMQSubscribe, b) + assert.NoError(t, err) + var subscribeResp TMQSubscribeResp + err = json.Unmarshal(msg, &subscribeResp) + assert.NoError(t, err) + assert.NotEqual(t, 0, subscribeResp.Code, subscribeResp.Message) +}