Skip to content

Commit

Permalink
perf: vector generate (#1748)
Browse files Browse the repository at this point in the history
  • Loading branch information
c121914yu authored Jun 12, 2024
1 parent d0085a2 commit 05611df
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,50 +81,61 @@ async function handler(
});

// get 10 init dataset.data
const arr = new Array(10).fill(0);
const max = global.systemEnv?.vectorMaxProcess || 10;
const arr = new Array(max * 2).fill(0);

for await (const _ of arr) {
await mongoSessionRun(async (session) => {
const data = await MongoDatasetData.findOneAndUpdate(
{
teamId,
datasetId,
rebuilding: true
},
{
$unset: {
rebuilding: null
try {
const hasNext = await mongoSessionRun(async (session) => {
// get next dataset.data
const data = await MongoDatasetData.findOneAndUpdate(
{
rebuilding: true,
teamId,
datasetId
},
{
$unset: {
rebuilding: null
},
updateTime: new Date()
},
updateTime: new Date()
},
{
session
}
).select({
_id: 1,
collectionId: 1
});

if (data) {
await MongoDatasetTraining.create(
[
{
teamId,
tmbId,
datasetId,
collectionId: data.collectionId,
billId,
mode: TrainingModeEnum.chunk,
model: vectorModel,
q: '1',
dataId: data._id
}
],
{
session
}
);
).select({
_id: 1,
collectionId: 1
});

if (data) {
await MongoDatasetTraining.create(
[
{
teamId,
tmbId,
datasetId,
collectionId: data.collectionId,
billId,
mode: TrainingModeEnum.chunk,
model: vectorModel,
q: '1',
dataId: data._id
}
],
{
session
}
);
}

return !!data;
});

if (!hasNext) {
break;
}
});
} catch (error) {}
}

return {};
Expand Down
129 changes: 59 additions & 70 deletions projects/app/src/service/events/generateVector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,27 +158,69 @@ const rebuildData = async ({

const deleteVectorIdList = mongoData.indexes.map((index) => index.dataId);

const { tokens } = await mongoSessionRun(async (session) => {
// update vector, update dataset.data rebuilding status, delete data from training
const updateResult = await Promise.all(
mongoData.indexes.map(async (index, i) => {
const result = await insertDatasetDataVector({
query: index.text,
model: getVectorModel(trainingData.model),
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
mongoData.indexes[i].dataId = result.insertId;
return result;
})
);
// Find next rebuilding data to insert training queue
await mongoSessionRun(async (session) => {
// get new mongoData insert to training
const newRebuildingData = await MongoDatasetData.findOneAndUpdate(
{
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
rebuilding: true
},
{
$unset: {
rebuilding: null
},
updateTime: new Date()
},
{ session }
).select({
_id: 1,
collectionId: 1
});

// Ensure that the training data is deleted after the Mongo update is successful
if (newRebuildingData) {
await MongoDatasetTraining.create(
[
{
teamId: mongoData.teamId,
tmbId: trainingData.tmbId,
datasetId: mongoData.datasetId,
collectionId: newRebuildingData.collectionId,
billId: trainingData.billId,
mode: TrainingModeEnum.chunk,
model: trainingData.model,
q: '1',
dataId: newRebuildingData._id
}
],
{ session }
);
}
});

// update vector, update dataset_data rebuilding status, delete data from training
// 1. Insert new vector to dataset_data
const updateResult = await Promise.all(
mongoData.indexes.map(async (index, i) => {
const result = await insertDatasetDataVector({
query: index.text,
model: getVectorModel(trainingData.model),
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
mongoData.indexes[i].dataId = result.insertId;
return result;
})
);
const { tokens } = await mongoSessionRun(async (session) => {
// 2. Ensure that the training data is deleted after the Mongo update is successful
await mongoData.save({ session });
// 3. Delete the training data
await trainingData.deleteOne({ session });

// delete old vector
// 4. Delete old vector
await deleteDatasetDataVector({
teamId: mongoData.teamId,
idList: deleteVectorIdList
Expand All @@ -189,59 +231,6 @@ const rebuildData = async ({
};
});

// find next data insert to training queue
const arr = new Array(5).fill(0);

for await (const _ of arr) {
try {
const hasNextData = await mongoSessionRun(async (session) => {
// get new mongoData insert to training
const newRebuildingData = await MongoDatasetData.findOneAndUpdate(
{
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
rebuilding: true
},
{
$unset: {
rebuilding: null
},
updateTime: new Date()
},
{ session }
).select({
_id: 1,
collectionId: 1
});

if (newRebuildingData) {
await MongoDatasetTraining.create(
[
{
teamId: mongoData.teamId,
tmbId: trainingData.tmbId,
datasetId: mongoData.datasetId,
collectionId: newRebuildingData.collectionId,
billId: trainingData.billId,
mode: TrainingModeEnum.chunk,
model: trainingData.model,
q: '1',
dataId: newRebuildingData._id
}
],
{ session }
);
}

return !!newRebuildingData;
});

if (!hasNextData) {
break;
}
} catch (error) {}
}

return { tokens };
};

Expand Down

0 comments on commit 05611df

Please sign in to comment.