Skip to content

Commit

Permalink
Merge pull request #3958 from atlanhq/mesh-316-support-unarchive
Browse files Browse the repository at this point in the history
MESH-316 | Supporting Unarchiving of Products
  • Loading branch information
ankitpatnaik-atlan authored Feb 10, 2025
2 parents 21f976a + 2d1e931 commit a0154b0
Show file tree
Hide file tree
Showing 6 changed files with 377 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ public final class Constants {
public static final String VERTEX_ID_IN_IMPORT_KEY = "__vIdInImport";
public static final String EDGE_ID_IN_IMPORT_KEY = "__eIdInImport";

/*
* Edge labels for data product relations which are hard deleted
*/

public static final Set<String> EDGE_LABELS_FOR_HARD_DELETION = new HashSet<>(Arrays.asList( OUTPUT_PORT_PRODUCT_EDGE_LABEL, INPUT_PORT_PRODUCT_EDGE_LABEL, TERM_ASSIGNMENT_LABEL ));
/*
* elasticsearch attributes
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package org.apache.atlas.repository.migration;

import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.TransactionInterceptHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Set;

import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.EDGE_LABELS_FOR_HARD_DELETION;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getStatus;

public class SoftDeletionProductMigrationService {

private static final Logger LOG = LoggerFactory.getLogger(SoftDeletionProductMigrationService.class);

private final AtlasGraph graph;
private final Set<String> productGuids;
private final GraphHelper graphHelper;
private final TransactionInterceptHelper transactionInterceptHelper;

public SoftDeletionProductMigrationService(AtlasGraph graph, Set<String> productGuids, GraphHelper graphHelper, TransactionInterceptHelper transactionInterceptHelper) {
this.graph = graph;
this.productGuids = productGuids;
this.graphHelper = graphHelper;
this.transactionInterceptHelper = transactionInterceptHelper;
}

public void startEdgeMigration() throws AtlasBaseException {
try {
int count = 0;
int totalUpdatedCount = 0;
for (String productGuid: productGuids) {
LOG.info("Removing edges for Product: {}", productGuid);

if (productGuid != null && !productGuid.trim().isEmpty()) {
AtlasVertex productVertex = graphHelper.getVertexForGUID(productGuid);

if (productVertex == null) {
LOG.info("ProductGUID with no vertex found: {}", productGuid);
} else {
AtlasEntity.Status vertexStatus = getStatus(productVertex);

if (ACTIVE.equals(vertexStatus)) {
boolean isCommitRequired = deleteEdgeForActiveProduct(productVertex);
if (isCommitRequired) {
count++;
totalUpdatedCount++;
}
}

if (DELETED.equals(vertexStatus)) {
boolean isCommitRequired = deleteEdgeForArchivedProduct(productVertex);
if (isCommitRequired) {
count++;
totalUpdatedCount++;
}
}

if (count == 20) {
LOG.info("Committing batch of 20 products...");
commitChanges();
count = 0;
}
}
}
}

if (count > 0) {
LOG.info("Committing remaining {} products...", count);
commitChanges();
}

LOG.info("Total products updated: {}", totalUpdatedCount);
} catch (Exception e) {
LOG.error("Error while restoring state for Products: {}", productGuids, e);
throw new AtlasBaseException(e);
}
}


public boolean deleteEdgeForActiveProduct(AtlasVertex productVertex) {
boolean isCommitRequired = false;
try {
Iterator<AtlasEdge> existingEdges = productVertex.getEdges(AtlasEdgeDirection.BOTH, (String[]) EDGE_LABELS_FOR_HARD_DELETION.toArray(new String[0])).iterator();

if (existingEdges == null || !existingEdges.hasNext()) {
return isCommitRequired;
}

while (existingEdges.hasNext()) {
AtlasEdge edge = existingEdges.next();

AtlasEntity.Status edgeStatus = getStatus(edge);

if (DELETED.equals(edgeStatus)) {
graph.removeEdge(edge);
isCommitRequired = true;
}
}
} catch (Exception e) {
LOG.error("Error while deleting soft edges for Active Product: {}", productVertex, e);
throw new RuntimeException(e);
}
return isCommitRequired;
}


private boolean deleteEdgeForArchivedProduct(AtlasVertex productVertex) {
boolean isCommitRequired = false;
try {
Long updatedTime = productVertex.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);
Iterator<AtlasEdge> existingEdges = productVertex.getEdges(AtlasEdgeDirection.BOTH, (String[]) EDGE_LABELS_FOR_HARD_DELETION.toArray(new String[0])).iterator();

if (existingEdges == null || !existingEdges.hasNext()) {
return isCommitRequired;
}

while (existingEdges.hasNext()) {
AtlasEdge edge = existingEdges.next();
Long modifiedEdgeTimestamp = edge.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);

if (!updatedTime.equals(modifiedEdgeTimestamp)) {
LOG.info("Removing edge with different timestamp: {}", edge);
graph.removeEdge(edge);
isCommitRequired = true;
}
}
} catch (Exception e) {
LOG.error("Error while deleting edges for Archived Product: {}", productVertex, e);
throw new RuntimeException(e);
}
return isCommitRequired;
}

public void commitChanges() throws AtlasBaseException {
try {
transactionInterceptHelper.intercept();
LOG.info("Committed a entity to the graph");
} catch (Exception e) {
LOG.error("Failed to commit asset: ", e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package org.apache.atlas.repository.migration;

import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Set;

import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.EDGE_LABELS_FOR_HARD_DELETION;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getStatus;

public class ValidateProductEdgesMigrationService {

private static final Logger LOG = LoggerFactory.getLogger(ValidateProductEdgesMigrationService.class);

private final Set<String> productGuids;
private final GraphHelper graphHelper;

public ValidateProductEdgesMigrationService(Set<String> productGuids, GraphHelper graphHelper) {
this.productGuids = productGuids;
this.graphHelper = graphHelper;
}

public boolean validateEdgeMigration() throws AtlasBaseException {
try {
int count = 0;
int totalProductChecked = 0;
boolean redundantEdgesFound = false;

for (String productGuid: productGuids) {
LOG.info("Validating edges for Product: {}", productGuid);

if (productGuid != null && !productGuid.trim().isEmpty()) {
AtlasVertex productVertex = graphHelper.getVertexForGUID(productGuid);

if (productVertex == null) {
LOG.info("ProductGUID with no vertex found: {}", productGuid);
} else {
AtlasEntity.Status vertexStatus = getStatus(productVertex);

if (ACTIVE.equals(vertexStatus)) {
boolean softDeletedEdgesFound = validateEdgeForActiveProduct(productVertex);
if (softDeletedEdgesFound) {
count++;
totalProductChecked++;
} else {
totalProductChecked++;
}
}

if (DELETED.equals(vertexStatus)) {
boolean edgeWithDifferentTimeStampFound = validateEdgeForArchivedProduct(productVertex);
if (edgeWithDifferentTimeStampFound) {
count++;
totalProductChecked++;
} else {
totalProductChecked++;
}
}
}
}
}

if (count > 0) {
redundantEdgesFound = true;
LOG.info("Found {} products with redundant edges....", count);
}

LOG.info("Total products checked: {}", totalProductChecked);

return redundantEdgesFound;
} catch (Exception e) {
LOG.error("Error while validating edges for Products: {}", productGuids, e);
throw new AtlasBaseException(e);
}
}

public boolean validateEdgeForActiveProduct (AtlasVertex productVertex) {
boolean softDeletedEdgesFound = false;

try {
Iterator<AtlasEdge> existingEdges = productVertex.getEdges(AtlasEdgeDirection.BOTH, (String[]) EDGE_LABELS_FOR_HARD_DELETION.toArray(new String[0])).iterator();

if (existingEdges == null || !existingEdges.hasNext()) {
LOG.info("No edges found for Product: {}", productVertex);
return softDeletedEdgesFound;
}

while (existingEdges.hasNext()) {
AtlasEdge edge = existingEdges.next();

AtlasEntity.Status edgeStatus = getStatus(edge);

if (DELETED.equals(edgeStatus)) {
softDeletedEdgesFound = true;
}
}
} catch (Exception e) {
LOG.error("Error while validating edges for Active Product: {}", productVertex, e);
throw new RuntimeException(e);
}

return softDeletedEdgesFound;
}

public boolean validateEdgeForArchivedProduct (AtlasVertex productVertex) {
boolean edgeWithDifferentTimeStampFound = false;
try {
Long updatedTime = productVertex.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);
Iterator<AtlasEdge> existingEdges = productVertex.getEdges(AtlasEdgeDirection.BOTH, (String[]) EDGE_LABELS_FOR_HARD_DELETION.toArray(new String[0])).iterator();

if (existingEdges == null || !existingEdges.hasNext()) {
LOG.info("No edges found for Product: {}", productVertex);
return edgeWithDifferentTimeStampFound;
}

while (existingEdges.hasNext()) {
AtlasEdge edge = existingEdges.next();
Long modifiedEdgeTimestamp = edge.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);

if (!updatedTime.equals(modifiedEdgeTimestamp)) {
LOG.info("Found edge with different timestamp: {}", edge);
edgeWithDifferentTimeStampFound = true;
}
}
} catch (Exception e) {
LOG.error("Error while validating edges for Archived Product: {}", productVertex, e);
throw new RuntimeException(e);
}
return edgeWithDifferentTimeStampFound;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public abstract class DeleteHandlerV1 {
private final AtlasGraph graph;
private final TaskUtil taskUtil;


public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete, TaskManagement taskManagement) {
this.typeRegistry = typeRegistry;
this.graphHelper = new GraphHelper(graph);
Expand Down Expand Up @@ -386,7 +385,8 @@ public boolean deleteEdgeReference(AtlasEdge edge, TypeCategory typeCategory, bo
// for relationship edges, inverse vertex's relationship attribute doesn't need to be updated.
// only delete the reference relationship edge
if (GraphHelper.isRelationshipEdge(edge)) {
deleteEdge(edge, isInternalType || isCustomRelationship(edge));
deleteEdge(edge, isInternalType || isCustomRelationship(edge) || isHardDeleteProductRelationship(edge));

AtlasVertex referencedVertex = entityRetriever.getReferencedEntityVertex(edge, relationshipDirection, entityVertex);

if (referencedVertex != null) {
Expand All @@ -403,7 +403,7 @@ public boolean deleteEdgeReference(AtlasEdge edge, TypeCategory typeCategory, bo
//legacy case - not a relationship edge
//If deleting just the edge, reverse attribute should be updated for any references
//For example, for the department type system, if the person's manager edge is deleted, subordinates of manager should be updated
deleteEdge(edge, true, isInternalType || isCustomRelationship(edge));
deleteEdge(edge, true, isInternalType || isCustomRelationship(edge) || isHardDeleteProductRelationship(edge));
}
}

Expand Down Expand Up @@ -997,7 +997,7 @@ protected void deleteEdgeBetweenVertices(AtlasVertex outVertex, AtlasVertex inVe

if (edge != null) {
boolean isInternal = isInternalType(inVertex) && isInternalType(outVertex);
deleteEdge(edge, isInternal || isCustomRelationship(edge));
deleteEdge(edge, isInternal || isCustomRelationship(edge) || isHardDeleteProductRelationship(edge));

final RequestContext requestContext = RequestContext.get();
final String outId = GraphHelper.getGuid(outVertex);
Expand Down Expand Up @@ -1084,6 +1084,10 @@ private boolean isCustomRelationship(final AtlasEdge edge) {
return edge.getLabel().equals(UD_RELATIONSHIP_EDGE_LABEL);
}

private boolean isHardDeleteProductRelationship(final AtlasEdge edge) {
return EDGE_LABELS_FOR_HARD_DELETION.contains(edge.getLabel());
}

private void addToPropagatedClassificationNames(AtlasVertex entityVertex, String classificationName) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding property {} = \"{}\" to vertex {}", PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName, string(entityVertex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1974,8 +1974,14 @@ public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext co
for (int index = 0; allArrayElements != null && index < allArrayElements.size(); index++) {
Object element = allArrayElements.get(index);

if (element instanceof AtlasEdge) {
AtlasGraphUtilsV2.setEncodedProperty((AtlasEdge) element, ATTRIBUTE_INDEX_PROPERTY_KEY, index);
if ((element instanceof AtlasEdge)) {
AtlasEdge edge = (AtlasEdge) element;
if ((removedElements.contains(element)) && ((EDGE_LABELS_FOR_HARD_DELETION).contains(edge.getLabel()))) {
continue;
}
else {
AtlasGraphUtilsV2.setEncodedProperty((AtlasEdge) element, ATTRIBUTE_INDEX_PROPERTY_KEY, index);
}
}
}

Expand Down Expand Up @@ -3000,7 +3006,7 @@ private List<AtlasEdge> removeUnusedArrayEntries(AtlasAttribute attribute, List<
List<AtlasEdge> additionalElements = new ArrayList<>();

for (AtlasEdge edge : edgesToRemove) {
if (getStatus(edge) == DELETED ) {
if (getStatus(edge) == DELETED) {
continue;
}

Expand Down
Loading

0 comments on commit a0154b0

Please sign in to comment.