Skip to content

Commit

Permalink
MLH-108 Fix diff issue while update asset
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilbonte21 committed Feb 13, 2025
1 parent 5b32491 commit b5bc750
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.atlas.repository.store.graph.v2;

import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.util.AtlasEntityUtils;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
Expand All @@ -35,20 +37,26 @@
import java.util.Objects;

import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes;
import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_ADD;
import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_DELETE;
import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_UPDATE;

public class AtlasEntityComparator {
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
private final Map<String, String> guidRefMap;
private final boolean skipClassificationCompare;
private final boolean appendClassifications;
private final boolean replaceClassifications;
private final boolean skipBusinessAttributeCompare;

public AtlasEntityComparator(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, Map<String, String> guidRefMap,
boolean skipClassificationCompare, boolean skipBusinessAttributeCompare) {
boolean replaceClassifications, boolean appendClassifications,
boolean skipBusinessAttributeCompare) {
this.typeRegistry = typeRegistry;
this.entityRetriever = entityRetriever;
this.guidRefMap = guidRefMap;
this.skipClassificationCompare = skipClassificationCompare;
this.appendClassifications = appendClassifications;
this.replaceClassifications = replaceClassifications;
this.skipBusinessAttributeCompare = skipBusinessAttributeCompare;
}

Expand Down Expand Up @@ -152,17 +160,35 @@ private AtlasEntityDiffResult getDiffResult(AtlasEntity updatedEntity, AtlasEnti
}
}

if (!skipClassificationCompare) {
if (replaceClassifications || appendClassifications) {
List<AtlasClassification> newVal = updatedEntity.getClassifications();
List<AtlasClassification> currVal = (storedEntity != null) ? storedEntity.getClassifications() : entityRetriever.getAllClassifications(storedVertex);

if (!Objects.equals(currVal, newVal)) {
diffEntity.setClassifications(newVal);
if (replaceClassifications) {
if (!Objects.equals(currVal, newVal)) {
diffEntity.setClassifications(newVal);

sectionsWithDiff++;
sectionsWithDiff++;

if (findOnlyFirstDiff) {
return new AtlasEntityDiffResult(diffEntity, true, false, false);
if (findOnlyFirstDiff) {
return new AtlasEntityDiffResult(diffEntity, true, false, false);
}
}
}

if (appendClassifications) {
Map<String, List<AtlasClassification>> diff = AtlasEntityUtils.validateAndGetTagsDiff(updatedEntity.getGuid(),
updatedEntity.getAddOrUpdateClassifications(),
currVal,
updatedEntity.getRemoveClassifications());

if (MapUtils.isNotEmpty(diff)) {
sectionsWithDiff++;
RequestContext.get().addTagsAppendDiff(updatedEntity.getGuid(), diff);

if (findOnlyFirstDiff) {
return new AtlasEntityDiffResult(diffEntity, true, false, false);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,7 @@ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean
MetricRecorder checkForUnchangedEntities = RequestContext.get().startMetricRecord("checkForUnchangedEntities");

List<AtlasEntity> entitiesToSkipUpdate = new ArrayList<>();
AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), !replaceClassifications, !replaceBusinessAttributes);
AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), replaceClassifications, appendClassifications, !replaceBusinessAttributes);
RequestContext reqContext = RequestContext.get();

for (AtlasEntity entity : context.getUpdatedEntities()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,25 +430,33 @@ public EntityMutationResponse mapAttributesAndClassifications(EntityMutationCont
if (replaceClassifications) {
deleteClassifications(guid);
addClassifications(context, guid, updatedEntity.getClassifications());

} else if (appendClassifications) {
Map<String, List<AtlasClassification>> diff = AtlasEntityUtils.validateAndGetTagsDiff(updatedEntity.getGuid(),
updatedEntity.getAddOrUpdateClassifications(),
entityRetriever.getAllClassifications(vertex),
updatedEntity.getRemoveClassifications());
Map<String, List<AtlasClassification>> diff = RequestContext.get().getTagsAppendDiff(guid);

if (MapUtils.isNotEmpty(diff)) {
List<AtlasClassification> finalTags = new ArrayList<>();
if (diff.containsKey(PROCESS_DELETE)) {
for (AtlasClassification tag : diff.get(PROCESS_DELETE)) {
deleteClassification(updatedEntity.getGuid(), tag.getTypeName());
}
}

if (diff.containsKey(PROCESS_UPDATE)) {
finalTags.addAll(diff.get(PROCESS_UPDATE));
updateClassifications(context, updatedEntity.getGuid(), diff.get(PROCESS_UPDATE));
}

if (diff.containsKey(PROCESS_ADD)) {
finalTags.addAll(diff.get(PROCESS_ADD));
addClassifications(context, updatedEntity.getGuid(), diff.get(PROCESS_ADD));
}

if (diff.containsKey("NOOP")) {
finalTags.addAll(diff.get("NOOP"));
}

RequestContext.get().getDifferentialEntity(guid).setClassifications(finalTags); // For notifications
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public ContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry,
super(graph, typeRegistry, entityRetriever, discovery);
this.storeDifferentialAudits = storeDifferentialAudits;
this.discovery = discovery;
this.entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, null, true, true);
this.entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, null, true, false, true);

}

Expand Down Expand Up @@ -277,7 +277,7 @@ private void datasetAttributeSync(EntityMutationContext context, AtlasEntity ass
}

private void recordEntityMutatedDetails(EntityMutationContext context, AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), true, true);
AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), true, false, true);
AtlasEntityComparator.AtlasEntityDiffResult diffResult = entityComparator.getDiffResult(entity, vertex, !storeDifferentialAudits);
RequestContext reqContext = RequestContext.get();
if (diffResult.hasDifference()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.atlas.repository.store.graph.v2.ClassificationAssociator;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -92,6 +93,70 @@ public static Map<String, List<AtlasClassification>> validateAndGetTagsDiff(Stri
List<AtlasClassification> tagsToRemove) {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("validateAndGetTagsDiff");

try {
Map<String, List<AtlasClassification>> operationListMap = new HashMap<>();

List<AtlasClassification> toAdd = new ArrayList<>();
List<AtlasClassification> toUpdate = new ArrayList<>();
List<AtlasClassification> toRemove = new ArrayList<>();
List<AtlasClassification> toPreserve = new ArrayList<>();

Map<String, AtlasClassification> currentTagWithKeys = new HashMap<>();
Optional.ofNullable(currentTags).orElse(Collections.emptyList()).forEach(x -> currentTagWithKeys.put(generateClassificationComparisonKey(x), x));

Map<String, AtlasClassification> newTagWithKeys = new HashMap<>();
Optional.ofNullable(newTags).orElse(Collections.emptyList()).forEach(x -> newTagWithKeys.put(generateClassificationComparisonKey(x), x));

for (AtlasClassification tagToRemove: Optional.ofNullable(tagsToRemove).orElse(Collections.emptyList())) {
if (StringUtils.isEmpty(tagToRemove.getEntityGuid())) {
tagToRemove.setEntityGuid(entityGuid);
}

String tagToRemoveKey = generateClassificationComparisonKey(tagToRemove);
if (currentTagWithKeys.containsKey(tagToRemoveKey)) {
toRemove.add(tagToRemove);
newTagWithKeys.remove(tagToRemoveKey); // performs dedup across addOrUpdate & remove tags list
} else {
//ignoring the tag as it was not already present on the asset
}
}

for (String newTagKey: newTagWithKeys.keySet()) {
AtlasClassification newTag = newTagWithKeys.get(newTagKey);

if (StringUtils.isEmpty(newTag.getEntityGuid())) {
newTag.setEntityGuid(entityGuid);
}

if (currentTagWithKeys.containsKey(newTagKey)) {
boolean hasDiff = !newTag.checkForUpdate(currentTagWithKeys.get(newTagKey));
if (hasDiff) {
toUpdate.add(newTag);
} else {
toPreserve.add(newTag);
}
} else {
toAdd.add(newTag);
}
}

bucket(PROCESS_DELETE, operationListMap, toRemove);
bucket(PROCESS_UPDATE, operationListMap, toUpdate);
bucket(PROCESS_ADD, operationListMap, toAdd);
bucket("NOOP", operationListMap, toPreserve);

return operationListMap;
} finally {
RequestContext.get().endMetricRecord(recorder);
}
}

/*public static Map<String, List<AtlasClassification>> validateAndGetTagsDiff(String entityGuid,
List<AtlasClassification> newTags,
List<AtlasClassification> currentTags,
List<AtlasClassification> tagsToRemove) {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("validateAndGetTagsDiff");
try {
Map<String, List<AtlasClassification>> operationListMap = new HashMap<>();
Set<String> preExistingClassificationKeys = new HashSet<>();
Expand Down Expand Up @@ -136,7 +201,7 @@ public static Map<String, List<AtlasClassification>> validateAndGetTagsDiff(Stri
} finally {
RequestContext.get().endMetricRecord(recorder);
}
}
}*/

private static String generateClassificationComparisonKey(AtlasClassification classification) {
return classification.getEntityGuid() + "|" + classification.getTypeName();
Expand Down
10 changes: 10 additions & 0 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public class RequestContext {
private Map<AtlasClassification, Collection<Object>> deletedClassificationAndVertices = new HashMap<>();
private Map<AtlasClassification, Collection<Object>> addedClassificationAndVertices = new HashMap<>();

Map<String, Object> tagsAppendDiff = new HashMap<>();


private RequestContext() {
}
Expand Down Expand Up @@ -477,6 +479,14 @@ public void addAddedClassificationAndVertices(AtlasClassification classification
this.addedClassificationAndVertices.put(classification, vertices);
}

public Map<String, List<AtlasClassification>> getTagsAppendDiff(String entityGuid) {
return (Map<String, List<AtlasClassification>>) tagsAppendDiff.get(entityGuid);
}

public void addTagsAppendDiff(String entityGuid, Map<String, List<AtlasClassification>> tagsAppendDiff) {
this.tagsAppendDiff.put(entityGuid, tagsAppendDiff);
}

public void addToDeletedEdgesIds(String edgeId) {
deletedEdgesIds.add(edgeId);
}
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit b5bc750

Please sign in to comment.