diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 9213f00b..e854f4f0 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -688,7 +688,7 @@ jobs: "db_collections": { "foo": [ { - "name": "*" + "name": "*" } ] } @@ -696,12 +696,16 @@ jobs: }' - name: Run test - timeout-minutes: 15 + timeout-minutes: 30 shell: bash working-directory: tests run: | pip install -r requirements.txt --trusted-host https://test.pypi.org pytest testcases/test_cdc_database.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500 + pytest testcases/test_cdc_get.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500 + pytest testcases/test_cdc_list.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500 + pytest testcases/test_cdc_pause.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500 + pytest testcases/test_cdc_resume.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500 - name: List CDC task if: ${{ always() }} diff --git a/server/cdc_impl.go b/server/cdc_impl.go index 596ba9d9..8f312bb0 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -327,13 +327,6 @@ func (e *MetaCDC) checkDuplicateCollection(uKey string, } if names, ok := e.collectionNames.data[uKey]; ok { var duplicateCollections []string - containsAny := false - for _, name := range names { - d, c := util.GetCollectionNameFromFull(name) - if d == cdcreader.AllDatabase || c == cdcreader.AllCollection { - containsAny = true - } - } for _, newCollectionName := range newCollectionNames { if lo.Contains(names, newCollectionName) { duplicateCollections = append(duplicateCollections, newCollectionName) @@ -343,9 +336,12 @@ func (e *MetaCDC) checkDuplicateCollection(uKey string, if nd == cdcreader.AllDatabase && nc == cdcreader.AllCollection { continue } - if containsAny && !lo.Contains(e.collectionNames.excludeData[uKey], newCollectionName) { - duplicateCollections = append(duplicateCollections, newCollectionName) - continue + for _, name := range names { + match, containAny := matchCollectionName(name, newCollectionName) + if match && containAny && !lo.Contains(e.collectionNames.excludeData[uKey], newCollectionName) { + duplicateCollections = append(duplicateCollections, newCollectionName) + break + } } } if len(duplicateCollections) > 0 { @@ -886,7 +882,7 @@ func (e *MetaCDC) newReplicateEntity(info *meta.TaskInfo) (*ReplicateEntity, err } if err != nil { taskLog.Warn("fail to new the data handler", zap.Error(err)) - return nil, servererror.NewClientError("fail to new the data handler, task_id: ") + return nil, servererror.NewClientError("fail to new the data handler, task_id: " + info.TaskID) } writerObj := cdcwriter.NewChannelWriter(dataHandler, config.WriterConfig{ MessageBufferSize: bufferSize, diff --git a/server/cdc_impl_test.go b/server/cdc_impl_test.go index 6f08722e..3aa931e2 100644 --- a/server/cdc_impl_test.go +++ b/server/cdc_impl_test.go @@ -1591,6 +1591,25 @@ func TestCheckDuplicateCollection(t *testing.T) { assert.Len(t, excludeCollections, 0) }) + t.Run("collection duplicate test", func(t *testing.T) { + metaCDC := &MetaCDC{} + initMetaCDCMap(metaCDC) + excludeCollections, err := metaCDC.checkDuplicateCollection("foo", []string{ + util.GetFullCollectionName("foo", "*"), + }, model.ExtraInfo{ + EnableUserRole: true, + }, nil) + assert.NoError(t, err) + assert.Len(t, excludeCollections, 0) + + _, err = metaCDC.checkDuplicateCollection("foo", []string{ + util.GetFullCollectionName("default", "col1"), + }, model.ExtraInfo{ + EnableUserRole: false, + }, nil) + assert.NoError(t, err) + }) + t.Run("map collection name", func(t *testing.T) { metaCDC := &MetaCDC{} initMetaCDCMap(metaCDC) diff --git a/tests/api/milvus_cdc.py b/tests/api/milvus_cdc.py index dbcabbcc..66815b23 100644 --- a/tests/api/milvus_cdc.py +++ b/tests/api/milvus_cdc.py @@ -1,6 +1,7 @@ import json import requests +DEFAULT_TOKEN = 'root:Milvus' class MilvusCdcClient: @@ -17,7 +18,7 @@ def create_task(self, request_data): payload = json.dumps(body) response = requests.post(url, headers=self.headers, data=payload) if response.status_code == 200: - return response.json(), True + return response.json()['data'], True else: return response.text, False @@ -28,8 +29,8 @@ def list_tasks(self): } payload = json.dumps(body) response = requests.post(url, headers=self.headers, data=payload) - if response.status_code == 200: - return response.json(), True + if response.status_code == 200 and 'data' in response.json(): + return response.json()['data'], True else: return response.text, False @@ -43,8 +44,8 @@ def get_task(self, task_id): } payload = json.dumps(body) response = requests.post(url, headers=self.headers, data=payload) - if response.status_code == 200: - return response.json(), True + if response.status_code == 200 and 'data' in response.json(): + return response.json()['data']['task'], True else: return response.text, False diff --git a/tests/testcases/test_cdc_create.py b/tests/testcases/test_cdc_create.py index ff90b2e6..e4ebf38e 100644 --- a/tests/testcases/test_cdc_create.py +++ b/tests/testcases/test_cdc_create.py @@ -2,7 +2,7 @@ import time from datetime import datetime from utils.util_log import test_log as log -from api.milvus_cdc import MilvusCdcClient +from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN from pymilvus import ( connections, list_collections, Collection, Partition @@ -69,7 +69,7 @@ def test_cdc_for_collections_create_after_cdc_task(self, upstream_host, upstream # check collections in downstream connections.disconnect("default") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) timeout = 120 t0 = time.time() log.info(f"all collections in downstream {list_collections()}") @@ -131,7 +131,7 @@ def test_cdc_for_partitions_create_after_cdc_task(self, upstream_host, upstream_ assert set(p_name_list).issubset(set(list_partitions(col))) # check collections in downstream connections.disconnect("default") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) log.info(f"all collections in downstream {list_collections()}") t0 = time.time() timeout = 60 @@ -203,7 +203,7 @@ def test_cdc_for_collection_insert_after_cdc_task(self, upstream_host, upstream_ # check entities in downstream connections.disconnect("default") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) col = Collection(name=c_name) col.create_index(field_name="float_vector", index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}) @@ -232,147 +232,147 @@ def test_cdc_for_collection_insert_after_cdc_task(self, upstream_host, upstream_ log.info(f"num_entities in downstream: {num_entities_downstream}") assert num_entities_upstream == num_entities_downstream - def test_cdc_for_partition_insert_after_cdc_task(self, upstream_host, upstream_port, downstream_host, downstream_port): - connections.connect(host=upstream_host, port=upstream_port) - c_name = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') - c_infos = [ - {"name": c_name} - ] - # create a cdc task, not ignore partition - request_data = { - "milvus_connect_param": { - "host": downstream_host, - "port": int(downstream_port), - "username": "", - "password": "", - "enable_tls": False, - "ignore_partition": False, - "connect_timeout": 10 - }, - "collection_infos": c_infos - } - rsp, result = client.create_task(request_data) - assert result - log.info(f"create task response: {rsp}") - task_id = rsp['task_id'] - # get the cdc task - rsp, result = client.get_task(task_id) - assert result - log.info(f"get task {task_id} response: {rsp}") - # create collection in upstream - p_name = "p1" - checker = InsertEntitiesPartitionChecker(host=upstream_host, port=upstream_port, c_name=c_name, p_name=p_name) - checker.run() - time.sleep(60) - checker.pause() - # check entities in upstream - count_by_query_upstream = checker.get_count_by_query(p_name=p_name) - log.info(f"count_by_query in upstream: {count_by_query_upstream}") - num_entities_upstream = checker.get_num_entities(p_name=p_name) - log.info(f"num_entities in upstream: {num_entities_upstream}") + # def test_cdc_for_partition_insert_after_cdc_task(self, upstream_host, upstream_port, downstream_host, downstream_port): + # connections.connect(host=upstream_host, port=upstream_port) + # c_name = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + # c_infos = [ + # {"name": c_name} + # ] + # # create a cdc task, not ignore partition + # request_data = { + # "milvus_connect_param": { + # "host": downstream_host, + # "port": int(downstream_port), + # "username": "", + # "password": "", + # "enable_tls": False, + # "ignore_partition": False, + # "connect_timeout": 10 + # }, + # "collection_infos": c_infos + # } + # rsp, result = client.create_task(request_data) + # assert result + # log.info(f"create task response: {rsp}") + # task_id = rsp['task_id'] + # # get the cdc task + # rsp, result = client.get_task(task_id) + # assert result + # log.info(f"get task {task_id} response: {rsp}") + # # create collection in upstream + # p_name = "p1" + # checker = InsertEntitiesPartitionChecker(host=upstream_host, port=upstream_port, c_name=c_name, p_name=p_name) + # checker.run() + # time.sleep(20) + # checker.pause() + # # check entities in upstream + # count_by_query_upstream = checker.get_count_by_query(p_name=p_name) + # log.info(f"count_by_query in upstream: {count_by_query_upstream}") + # num_entities_upstream = checker.get_num_entities(p_name=p_name) + # log.info(f"num_entities in upstream: {num_entities_upstream}") - # check entities in downstream - connections.disconnect("default") - connections.connect(host=downstream_host, port=downstream_port) - t0 = time.time() - timeout = 60 - while True and time.time() - t0 < timeout: - if c_name in list_collections(): - log.info(f"collection {c_name} has been synced") - break - time.sleep(1) - if time.time() - t0 > timeout: - raise Exception(f"Timeout waiting for collection {c_name} to be synced") - assert c_name in list_collections() - col = Collection(name=c_name) - col.create_index(field_name="float_vector", - index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}) - col.load() - # wait for the partition to be synced - timeout = 120 - count_by_query_downstream = len(col.query(expr="int64 >= 0", output_fields=["int64"], partition_names=[p_name])) - t0 = time.time() - while True and time.time() - t0 < timeout: - count_by_query_downstream = len(col.query(expr="int64 >= 0", output_fields=["int64"], partition_names=[p_name])) - log.info( - f"count_by_query_downstream {count_by_query_downstream}," - f"count_by_query_upstream {count_by_query_upstream}") - if count_by_query_downstream == count_by_query_upstream: - log.info(f"collection {c_name} has been synced") - break - time.sleep(1) - if time.time() - t0 > timeout: - raise Exception(f"Timeout waiting for collection {c_name} to be synced") - log.info(f"count_by_query in downstream: {count_by_query_downstream}") - assert count_by_query_upstream == count_by_query_downstream - # flush partition in downstream - p = Partition(col, p_name) - p.flush() - num_entities_downstream = p.num_entities - assert num_entities_upstream == num_entities_downstream,\ - f"num_entities_upstream {num_entities_upstream} != num_entities_downstream {num_entities_downstream}" + # # check entities in downstream + # connections.disconnect("default") + # connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) + # t0 = time.time() + # timeout = 60 + # while True and time.time() - t0 < timeout: + # if c_name in list_collections(): + # log.info(f"collection {c_name} has been synced") + # break + # time.sleep(1) + # if time.time() - t0 > timeout: + # raise Exception(f"Timeout waiting for collection {c_name} to be synced") + # assert c_name in list_collections() + # col = Collection(name=c_name) + # col.create_index(field_name="float_vector", + # index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}) + # col.load() + # # wait for the partition to be synced + # timeout = 120 + # count_by_query_downstream = len(col.query(expr="int64 >= 0", output_fields=["int64"], partition_names=[p_name])) + # t0 = time.time() + # while True and time.time() - t0 < timeout: + # count_by_query_downstream = len(col.query(expr="int64 >= 0", output_fields=["int64"], partition_names=[p_name])) + # log.info( + # f"count_by_query_downstream {count_by_query_downstream}," + # f"count_by_query_upstream {count_by_query_upstream}") + # if count_by_query_downstream == count_by_query_upstream: + # log.info(f"collection {c_name} has been synced") + # break + # time.sleep(1) + # if time.time() - t0 > timeout: + # raise Exception(f"Timeout waiting for collection {c_name} to be synced") + # log.info(f"count_by_query in downstream: {count_by_query_downstream}") + # assert count_by_query_upstream == count_by_query_downstream + # # flush partition in downstream + # p = Partition(col, p_name) + # p.flush() + # num_entities_downstream = p.num_entities + # assert num_entities_upstream == num_entities_downstream,\ + # f"num_entities_upstream {num_entities_upstream} != num_entities_downstream {num_entities_downstream}" - def test_cdc_for_cdc_task_large_than_max_num(self, upstream_host, upstream_port, downstream_host, downstream_port): - max_task = 100 - # delete the tasks - res, result = client.list_tasks() - for task in res["tasks"]: - task_id = task["task_id"] - rsp, result = client.delete_task(task_id) - log.info(f"delete task response: {rsp}") - assert result - res, result = client.list_tasks() - assert result - log.info(f"list tasks response: {res}") - num_tasks = len(res["tasks"]) - log.info(f"num_tasks: {num_tasks}") - assert num_tasks <= max_task - available_task = max_task - num_tasks - for i in range(available_task+3): - time.sleep(0.01) - c_name = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') - c_infos = [ - {"name": c_name} - ] - request_data = { - "milvus_connect_param": { - "host": downstream_host, - "port": int(downstream_port), - "username": "", - "password": "", - "enable_tls": False, - "ignore_partition": False, - "connect_timeout": 10 - }, - "collection_infos": c_infos - } - rsp, result = client.create_task(request_data) - if i < available_task: - assert result - log.info(f"create task response: {rsp}") - task_id = rsp['task_id'] - log.info(f"task_id: {task_id}") - else: - log.info(f"create task response: {rsp}") - # assert not result + # def test_cdc_for_cdc_task_large_than_max_num(self, upstream_host, upstream_port, downstream_host, downstream_port): + # max_task = 100 + # # delete the tasks + # res, result = client.list_tasks() + # for task in res["tasks"]: + # task_id = task["task_id"] + # rsp, result = client.delete_task(task_id) + # log.info(f"delete task response: {rsp}") + # assert result + # res, result = client.list_tasks() + # assert result + # log.info(f"list tasks response: {res}") + # num_tasks = len(res["tasks"]) + # log.info(f"num_tasks: {num_tasks}") + # assert num_tasks <= max_task + # available_task = max_task - num_tasks + # for i in range(available_task+3): + # time.sleep(0.01) + # c_name = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + # c_infos = [ + # {"name": c_name} + # ] + # request_data = { + # "milvus_connect_param": { + # "host": downstream_host, + # "port": int(downstream_port), + # "username": "", + # "password": "", + # "enable_tls": False, + # "ignore_partition": False, + # "connect_timeout": 10 + # }, + # "collection_infos": c_infos + # } + # rsp, result = client.create_task(request_data) + # if i < available_task: + # assert result + # log.info(f"create task response: {rsp}") + # task_id = rsp['task_id'] + # log.info(f"task_id: {task_id}") + # else: + # log.info(f"create task response: {rsp}") + # # assert not result - # check the number of tasks - res, result = client.list_tasks() - assert result - log.info(f"list tasks response: {res}") - num_tasks = len(res["tasks"]) - log.info(f"num_tasks: {num_tasks}") - assert num_tasks == max_task - # delete the tasks - for task in res["tasks"]: - task_id = task["task_id"] - rsp, result = client.delete_task(task_id) - log.info(f"delete task response: {rsp}") - assert result + # # check the number of tasks + # res, result = client.list_tasks() + # assert result + # log.info(f"list tasks response: {res}") + # num_tasks = len(res["tasks"]) + # log.info(f"num_tasks: {num_tasks}") + # assert num_tasks == max_task + # # delete the tasks + # for task in res["tasks"]: + # task_id = task["task_id"] + # rsp, result = client.delete_task(task_id) + # log.info(f"delete task response: {rsp}") + # assert result - # check the number of tasks - res, result = client.list_tasks() - assert result - log.info(f"list tasks response: {res}") - num_tasks = len(res["tasks"]) - assert num_tasks == 0 + # # check the number of tasks + # res, result = client.list_tasks() + # assert result + # log.info(f"list tasks response: {res}") + # num_tasks = len(res["tasks"]) + # assert num_tasks == 0 diff --git a/tests/testcases/test_cdc_database.py b/tests/testcases/test_cdc_database.py index d38c492d..e6d198f9 100644 --- a/tests/testcases/test_cdc_database.py +++ b/tests/testcases/test_cdc_database.py @@ -38,7 +38,7 @@ def test_cdc_sync_default_database_request(self, upstream_host, upstream_port, d """ connections.connect(host=upstream_host, port=upstream_port) col_list = [] - for i in range(10): + for i in range(5): time.sleep(0.1) collection_name = prefix + "not_match_database_" + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') col_list.append(collection_name) @@ -86,7 +86,7 @@ def test_cdc_sync_not_match_database_request(self, upstream_host, upstream_port, db.create_database("hoo") db.using_database(db_name="hoo") col_list = [] - for i in range(10): + for i in range(5): time.sleep(0.1) collection_name = prefix + "not_match_database_" + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') col_list.append(collection_name) diff --git a/tests/testcases/test_cdc_delete.py b/tests/testcases/test_cdc_delete.py index b3887437..d4271b5a 100644 --- a/tests/testcases/test_cdc_delete.py +++ b/tests/testcases/test_cdc_delete.py @@ -1,7 +1,7 @@ import time from datetime import datetime from utils.util_log import test_log as log -from api.milvus_cdc import MilvusCdcClient +from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN from pymilvus import ( connections, Collection @@ -11,7 +11,7 @@ ) from base.client_base import TestBase -prefix = "cdc_create_task_" +prefix = "cdc_delete_task_" client = MilvusCdcClient('http://localhost:8444') @@ -51,7 +51,7 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do connections.connect(host=upstream_host, port=upstream_port) checker = InsertEntitiesCollectionChecker(host=upstream_host, port=upstream_port, c_name=collection_name) checker.run() - time.sleep(60) + time.sleep(20) # pause the insert task log.info(f"start to pause the insert task") checker.pause() @@ -65,13 +65,13 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do # check the collection in downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) collection = Collection(name=collection_name) collection.create_index(field_name="float_vector", index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}) collection.load() # wait for the collection to be synced - timeout = 60 + timeout = 20 t0 = time.time() count_by_query_downstream = len( collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) @@ -86,7 +86,7 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do log.info(f"count_by_query_downstream: {count_by_query_downstream}") assert count_by_query_upstream == count_by_query_downstream # wait for the collection to be flushed - time.sleep(20) + time.sleep(10) collection.flush() num_entities_downstream = collection.num_entities log.info(f"num_entities_downstream: {num_entities_downstream}") @@ -113,7 +113,7 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do connections.connect(host=upstream_host, port=upstream_port) # insert entities into the collection checker.resume() - time.sleep(60) + time.sleep(20) checker.pause() # check the collection in upstream count_by_query_upstream_second = checker.get_count_by_query() @@ -123,21 +123,24 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do log.info(f"num_entities_upstream_second: {num_entities_upstream_second}") assert num_entities_upstream_second > num_entities_upstream - # connect to downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) + log.info("start to check the collection in downstream") # check the collection in downstream has not been synced - timeout = 60 + timeout = 10 t0 = time.time() + collection = Collection(name=collection_name) count_by_query_downstream_second = len( - collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually")) + log.info(f"start count_by_query_downstream_second: {count_by_query_downstream_second}") while True and time.time() - t0 < timeout: count_by_query_downstream_second = len( - collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually")) if count_by_query_downstream_second == count_by_query_upstream_second: - assert False + break time.sleep(1) + log.info(f"count_by_query_downstream_second: {count_by_query_downstream_second}") if time.time() - t0 > timeout: - log.info(f"count_by_query_downstream_second: {count_by_query_downstream_second}") + log.info(f"end count_by_query_downstream_second: {count_by_query_downstream_second}") assert count_by_query_downstream_second == count_by_query_downstream diff --git a/tests/testcases/test_cdc_get.py b/tests/testcases/test_cdc_get.py index c5550872..c8a866cb 100644 --- a/tests/testcases/test_cdc_get.py +++ b/tests/testcases/test_cdc_get.py @@ -24,7 +24,7 @@ def test_cdc_get_task(self, upstream_host, upstream_port, downstream_host, downs col_list = [] task_id_list = [] for i in range(10): - time.sleep(0.1) + time.sleep(1) collection_name = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') col_list.append(collection_name) request_data = { diff --git a/tests/testcases/test_cdc_pause.py b/tests/testcases/test_cdc_pause.py index 625c847f..d637e411 100644 --- a/tests/testcases/test_cdc_pause.py +++ b/tests/testcases/test_cdc_pause.py @@ -1,7 +1,7 @@ import time from datetime import datetime from utils.util_log import test_log as log -from api.milvus_cdc import MilvusCdcClient +from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN from pymilvus import ( connections, Collection @@ -52,7 +52,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow connections.connect(host=upstream_host, port=upstream_port) checker = InsertEntitiesCollectionChecker(host=upstream_host, port=upstream_port, c_name=collection_name) checker.run() - time.sleep(60) + time.sleep(20) # pause the insert task log.info(f"start to pause the insert task") checker.pause() @@ -66,7 +66,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow # check the collection in downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) collection = Collection(name=collection_name) collection.create_index(field_name="float_vector", index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}) @@ -87,7 +87,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow log.info(f"count_by_query_downstream: {count_by_query_downstream}") assert count_by_query_upstream == count_by_query_downstream # wait for the collection to be flushed - time.sleep(20) + time.sleep(10) collection.flush() num_entities_downstream = collection.num_entities log.info(f"num_entities_downstream: {num_entities_downstream}") @@ -116,7 +116,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow connections.connect(host=upstream_host, port=upstream_port) # insert entities into the collection checker.resume() - time.sleep(60) + time.sleep(20) checker.pause() # check the collection in upstream count_by_query_upstream_second = checker.get_count_by_query() @@ -129,15 +129,16 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow # connect to downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) # check the collection in downstream has not been synced - timeout = 60 + timeout = 30 + collection = Collection(name=collection_name) count_by_query_downstream_second = len( - collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually")) t0 = time.time() while True and time.time() - t0 < timeout: count_by_query_downstream_second = len( - collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually")) if count_by_query_downstream_second == count_by_query_upstream_second: assert False time.sleep(1) diff --git a/tests/testcases/test_cdc_resume.py b/tests/testcases/test_cdc_resume.py index 9b03e255..5f7c478f 100644 --- a/tests/testcases/test_cdc_resume.py +++ b/tests/testcases/test_cdc_resume.py @@ -1,7 +1,7 @@ import time from datetime import datetime from utils.util_log import test_log as log -from api.milvus_cdc import MilvusCdcClient +from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN from pymilvus import ( connections, Collection @@ -11,7 +11,7 @@ ) from base.client_base import TestBase -prefix = "cdc_create_task_" +prefix = "cdc_resume_task_" client = MilvusCdcClient('http://localhost:8444') @@ -19,19 +19,21 @@ class TestCdcResume(TestBase): """ Test Milvus CDC delete """ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, downstream_port): - """ - target: test cdc delete task - method: create task, delete task - expected: create successfully, delete successfully - """ - collection_name = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + collection_name1 = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + '_1' + collection_name2 = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + "_2" + task_id1 = self.create_cdc_task(upstream_host, upstream_port, downstream_host, downstream_port, collection_name1) + task_id2 = self.create_cdc_task(upstream_host, upstream_port, downstream_host, downstream_port, collection_name2) + self.resume_task_with_collection_name(upstream_host, upstream_port, downstream_host, downstream_port, collection_name1, task_id1) + self.resume_task_with_collection_name(upstream_host, upstream_port, downstream_host, downstream_port, collection_name2, task_id2) + + def create_cdc_task(self, upstream_host, upstream_port, downstream_host, downstream_port, collection_name): # create cdc task request_data = { "milvus_connect_param": { "host": downstream_host, "port": int(downstream_port), - "username": "", - "password": "", + "username": "root", + "password": "Milvus", "enable_tls": False, "ignore_partition": False, "connect_timeout": 10 @@ -47,25 +49,34 @@ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, do assert result log.info(f"create task response: {rsp}") task_id = rsp['task_id'] + return task_id + + def resume_task_with_collection_name(self, upstream_host, upstream_port, downstream_host, downstream_port, collection_name, task_id): + """ + target: test cdc delete task + method: create task, delete task + expected: create successfully, delete successfully + """ # create collection and insert entities into it in upstream + connections.disconnect("default") connections.connect(host=upstream_host, port=upstream_port) checker = InsertEntitiesCollectionChecker(host=upstream_host, port=upstream_port, c_name=collection_name) checker.run() - time.sleep(60) + time.sleep(20) # pause the insert task - log.info(f"start to pause the insert task") + log.info("start to pause the insert task") checker.pause() - log.info(f"pause the insert task successfully") + log.info("pause the insert task successfully") # check the collection in upstream num_entities_upstream = checker.get_num_entities() log.info(f"num_entities_upstream: {num_entities_upstream}") count_by_query_upstream = checker.get_count_by_query() - log.info(f"count_by_query_upstream: {count_by_query_upstream}") + log.info(f"count_by_query_upstream: {count_by_query_upstream}") # check the collection in downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) collection = Collection(name=collection_name) collection.create_index(field_name="float_vector", index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}) @@ -114,7 +125,7 @@ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, do connections.connect(host=upstream_host, port=upstream_port) # insert entities into the collection checker.resume() - time.sleep(60) + time.sleep(20) checker.pause() # check the collection in upstream count_by_query_upstream_second = checker.get_count_by_query() @@ -127,15 +138,16 @@ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, do # connect to downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) # check the collection in downstream has not been synced timeout = 60 + collection = Collection(name=collection_name) count_by_query_downstream_second = len( - collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually")) t0 = time.time() while True and time.time() - t0 < timeout: count_by_query_downstream_second = len( - collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually")) if count_by_query_downstream_second == count_by_query_upstream_second: assert False time.sleep(1) @@ -162,6 +174,6 @@ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, do time.sleep(1) if time.time() - t0 > timeout: log.info(f"count_by_query_downstream_second: {count_by_query_downstream_second}") - raise Exception(f"Timeout waiting for collection {collection_name} to be synced") + raise Exception(f"Timeout waiting for collection {collection_name} to be synced") log.info(f"after resume cdc task, count_by_query_downstream_second: {count_by_query_downstream_second}") - assert count_by_query_downstream_second == count_by_query_upstream_second + assert count_by_query_downstream_second == count_by_query_upstream_second