From 62280f7a728225aeda0d2ae2f0e1b9c8bc434957 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Thu, 7 Nov 2024 13:25:37 +0530 Subject: [PATCH 01/27] Added logic for updating modificationTimestamp and modificationBy for BM attribute update. --- .../atlas/repository/store/graph/v2/EntityGraphMapper.java | 1 + 1 file changed, 1 insertion(+) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 0b5eb09fca..ad3c0cf11b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -823,6 +823,7 @@ public void addOrUpdateBusinessAttributes(AtlasVertex entityVertex, AtlasEntityT } if (MapUtils.isNotEmpty(updatedBusinessAttributes)) { + updateModificationMetadata(entityVertex); entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes); } From 2ddf8fe2e62ce1a2d4ce2c2beb6e46587322dfcc Mon Sep 17 00:00:00 2001 From: hr2904 Date: Thu, 7 Nov 2024 13:29:35 +0530 Subject: [PATCH 02/27] Added logic for updating modificationTimestamp and modificationBy for BM attribute update - 2 --- .../atlas/repository/store/graph/v2/EntityGraphMapper.java | 1 + 1 file changed, 1 insertion(+) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index ad3c0cf11b..5152bb2e5d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -739,6 +739,7 @@ public void setBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType enti } if (MapUtils.isNotEmpty(updatedBusinessAttributes)) { + updateModificationMetadata(entityVertex); entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes); } From f227a6895a137b0a3892b1d14fce6bc7f5074b76 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Mon, 11 Nov 2024 13:54:04 +0530 Subject: [PATCH 03/27] Added secure in ATLASSESSIONID Cookie. --- .github/workflows/maven.yml | 4 ++-- webapp/src/main/webapp/WEB-INF/web.xml | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f8a09b5589..3af25b1ea5 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -25,7 +25,7 @@ on: - beta - development - master - - lineageondemand + - dg1908 jobs: build: @@ -64,7 +64,7 @@ jobs: - name: Build with Maven run: | branch_name=${{ steps.get_branch.outputs.branch }} - if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]] + if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'dg1908' ]] then echo "build without dashboard" chmod +x ./build.sh && ./build.sh build_without_dashboard diff --git a/webapp/src/main/webapp/WEB-INF/web.xml b/webapp/src/main/webapp/WEB-INF/web.xml index 07092d62eb..590901b279 100755 --- a/webapp/src/main/webapp/WEB-INF/web.xml +++ b/webapp/src/main/webapp/WEB-INF/web.xml @@ -129,6 +129,7 @@ ATLASSESSIONID true + true From a6577ce8d88e2a18241579c8b6d59ffc0b1490cf Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Mon, 11 Nov 2024 19:43:24 +0530 Subject: [PATCH 04/27] feat: set utm tag as null to update it later --- intg/src/main/java/org/apache/atlas/AtlasConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index a25a9a06cc..bf49c46d96 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -114,7 +114,7 @@ public enum AtlasConfiguration { INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300), ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), - ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", "project_sdk_python"), + ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", null), ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false), From da751401234b2ffc667338d3c28480f56a48c220 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Mon, 11 Nov 2024 20:04:28 +0530 Subject: [PATCH 05/27] feat: add configurable abuse protection --- intg/src/main/java/org/apache/atlas/AtlasConfiguration.java | 3 ++- .../src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index bf49c46d96..90bcf357f2 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -114,7 +114,8 @@ public enum AtlasConfiguration { INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300), ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), - ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", null), + ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", "project_sdk_python"), + ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false), diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java index 1e8af3f804..ba94ff263c 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java @@ -401,7 +401,7 @@ public AtlasSearchResult indexSearch(@Context HttpServletRequest servletRequest, perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.indexSearch(" + parameters + ")"); } - if (parameters.getQuerySize() > AtlasConfiguration.ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT.getLong()) { + if (AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_API_LIMIT.getBoolean() && parameters.getQuerySize() > AtlasConfiguration.ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT.getLong()) { if(CollectionUtils.isEmpty(parameters.getUtmTags())) { throw new AtlasBaseException(AtlasErrorCode.INVALID_DSL_QUERY_SIZE, String.valueOf(AtlasConfiguration.ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT.getLong())); } From d174ad002267890166221b18621f0fea978fb6ee Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Mon, 11 Nov 2024 20:06:44 +0530 Subject: [PATCH 06/27] feat: make it empty --- intg/src/main/java/org/apache/atlas/AtlasConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 90bcf357f2..7fd081b9e3 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -114,7 +114,7 @@ public enum AtlasConfiguration { INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300), ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), - ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", "project_sdk_python"), + ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""), ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false), From a7d650aadc6902434311d1fa2e7f39ad38ed3631 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Tue, 12 Nov 2024 01:03:14 +0530 Subject: [PATCH 07/27] fix: Added 20s timeout for lineage traversal --- .../lineage/AtlasLineageOnDemandInfo.java | 9 + .../atlas/discovery/EntityLineageService.java | 163 +++++++++++------- .../atlas/discovery/TimeoutTracker.java | 17 ++ 3 files changed, 122 insertions(+), 67 deletions(-) create mode 100644 repository/src/main/java/org/apache/atlas/discovery/TimeoutTracker.java diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java index 3a86a8a963..9ddcfe8f2d 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java @@ -33,6 +33,7 @@ public class AtlasLineageOnDemandInfo implements Serializable { private LineageOnDemandRequest lineageOnDemandPayload; private boolean upstreamEntityLimitReached; private boolean downstreamEntityLimitReached; + private boolean traversalTimedOut; public AtlasLineageOnDemandInfo() { } @@ -135,6 +136,14 @@ public void setDownstreamEntityLimitReached(boolean downstreamEntityLimitReached this.downstreamEntityLimitReached = downstreamEntityLimitReached; } + public boolean isTraversalTimedOut() { + return traversalTimedOut; + } + + public void setTraversalTimedOut(boolean traversalTimedOut) { + this.traversalTimedOut = traversalTimedOut; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index a873105a6d..17758bded6 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -298,12 +298,13 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); AtomicInteger traversalOrder = new AtomicInteger(1); + TimeoutTracker tracker = new TimeoutTracker(50); // 20 seconds timeout if (isDataSet) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, tracker); if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, tracker); AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); ret.getGuidEntityMap().put(guid, baseEntityHeader); @@ -312,11 +313,11 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, tracker); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, tracker); } } RequestContext.get().endMetricRecord(metricRecorder); @@ -329,7 +330,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal baseEntityHeader.setFinishTime(traversalOrder.get()); } - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, TimeoutTracker timeoutTracker) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; int nextLevel = isInput ? level - 1: level + 1; @@ -346,7 +347,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI } boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); - if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, new HashSet<>())) { + if (handleHorizontalAndVerticalPagination(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, new HashSet<>(), timeoutTracker)) { break; } else { addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); @@ -360,66 +361,44 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, timeoutTracker); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, TimeoutTracker timeoutTracker) throws AtlasBaseException { if (isEntityTraversalLimitReached(entitiesTraversed)) return; if (depth != 0) { // base condition of recursion for depth AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand"); - AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; - int nextLevel = isInput ? level - 1: level + 1; - // keep track of visited vertices to avoid circular loop - visitedVertices.add(getId(datasetVertex)); + try { + int nextLevel = isInput ? level - 1 : level + 1; + // keep track of visited vertices to avoid circular loop + visitedVertices.add(getId(datasetVertex)); - AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); - RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); + AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); + Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); - while (incomingEdges.hasNext()) { - AtlasEdge incomingEdge = incomingEdges.next(); - AtlasVertex processVertex = incomingEdge.getOutVertex(); + while (incomingEdges.hasNext()) { + AtlasEdge incomingEdge = incomingEdges.next(); + AtlasVertex processVertex = incomingEdge.getOutVertex(); - if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { - continue; - } - - if (checkForOffset(incomingEdge, datasetVertex, atlasLineageOnDemandContext, ret)) { - continue; - } - - if (incrementAndCheckIfRelationsLimitReached(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) { - LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(baseGuid); - if (entityOnDemandInfo == null) - continue; - if (isInput ? entityOnDemandInfo.isInputRelationsReachedLimit() : entityOnDemandInfo.isOutputRelationsReachedLimit()) - break; - else + if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { continue; - } else { - addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level, traversalOrder); - } - - AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); - RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); - - while (outgoingEdges.hasNext()) { - AtlasEdge outgoingEdge = outgoingEdges.next(); - AtlasVertex entityVertex = outgoingEdge.getInVertex(); + } - if (!vertexMatchesEvaluation(entityVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(outgoingEdge, atlasLineageOnDemandContext)) { - continue; + if (timeoutTracker.hasTimedOut()) { + executeCircuitBreaker(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + return; } - if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { + if (checkForOffset(incomingEdge, datasetVertex, atlasLineageOnDemandContext, ret)) { continue; } - if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) { - String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); - LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); + + boolean isRelationsLimitReached = handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + if (isRelationsLimitReached) { + LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(baseGuid); if (entityOnDemandInfo == null) continue; if (isInput ? entityOnDemandInfo.isInputRelationsReachedLimit() : entityOnDemandInfo.isOutputRelationsReachedLimit()) @@ -427,24 +406,69 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); - entitiesTraversed.incrementAndGet(); - traversalOrder.incrementAndGet(); - if (isEntityTraversalLimitReached(entitiesTraversed)) - setEntityLimitReachedFlag(isInput, ret); + addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level, traversalOrder); } - if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth - AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); - traversedEntity.setFinishTime(traversalOrder.get()); + + AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); + Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); + + while (outgoingEdges.hasNext()) { + AtlasEdge outgoingEdge = outgoingEdges.next(); + AtlasVertex entityVertex = outgoingEdge.getInVertex(); + + if (!vertexMatchesEvaluation(entityVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(outgoingEdge, atlasLineageOnDemandContext)) { + continue; + } + + if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { + continue; + } + boolean stopTraversal = handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + // If timeout occurred or entity limit reached, set pagination flags again for consistency and stop + if (timeoutTracker.hasTimedOut()) { + handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + break; + } + if (stopTraversal) { + String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); + LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); + + if (entityOnDemandInfo == null) + continue; + if (isInput ? entityOnDemandInfo.isInputRelationsReachedLimit() : entityOnDemandInfo.isOutputRelationsReachedLimit()) + break; + else + continue; + } else { + addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); + entitiesTraversed.incrementAndGet(); + traversalOrder.incrementAndGet(); + + if (isEntityTraversalLimitReached(entitiesTraversed)) + setEntityLimitReachedFlag(isInput, ret); + } + if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, timeoutTracker); // execute inner depth + AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); + if (traversedEntity != null) + traversedEntity.setFinishTime(traversalOrder.get()); + } } } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); } - - RequestContext.get().endMetricRecord(metricRecorder); } } + + private void executeCircuitBreaker(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, Set visitedVertices, TimeoutTracker timeoutTracker) { + // Set timeout flags on the entity + handleHorizontalAndVerticalPagination(atlasEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + ret.setTraversalTimedOut(true); + } + private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDemandInfo ret) { if (isInput) ret.setUpstreamEntityLimitReached(true); @@ -684,7 +708,7 @@ private static String getId(AtlasVertex vertex) { return vertex.getIdForDisplay(); } - private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, AtlasLineageOnDemandInfo.LineageDirection direction, Set visitedVertices) { + private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, Set visitedVertices, TimeoutTracker timeoutTracker) { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("incrementAndCheckIfRelationsLimitReached"); AtlasVertex inVertex = isInput ? atlasEdge.getOutVertex() : atlasEdge.getInVertex(); @@ -698,9 +722,9 @@ private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, bo LineageInfoOnDemand inLineageInfo = ret.getRelationsOnDemand().containsKey(inGuid) ? ret.getRelationsOnDemand().get(inGuid) : new LineageInfoOnDemand(inGuidLineageConstraints); LineageInfoOnDemand outLineageInfo = ret.getRelationsOnDemand().containsKey(outGuid) ? ret.getRelationsOnDemand().get(outGuid) : new LineageInfoOnDemand(outGuidLineageConstraints); - setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, inVertex, inGuid, outVertex, outGuid, inLineageInfo, outLineageInfo, visitedVertices); + setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, depth, entitiesTraversed, inVertex, outVertex, inLineageInfo, outLineageInfo, visitedVertices, timeoutTracker); - boolean hasRelationsLimitReached = setVerticalPaginationFlags(entitiesTraversed, inLineageInfo, outLineageInfo); + boolean hasRelationsLimitReached = setVerticalPaginationFlags(entitiesTraversed, inLineageInfo, outLineageInfo, timeoutTracker); if (!hasRelationsLimitReached) { ret.getRelationsOnDemand().put(inGuid, inLineageInfo); ret.getRelationsOnDemand().put(outGuid, outLineageInfo); @@ -710,8 +734,13 @@ private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, bo return hasRelationsLimitReached; } - private boolean setVerticalPaginationFlags(AtomicInteger entitiesTraversed, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo) { + private boolean setVerticalPaginationFlags(AtomicInteger entitiesTraversed, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, TimeoutTracker timeoutTracker) { boolean hasRelationsLimitReached = false; + if (timeoutTracker.hasTimedOut()) { + inLineageInfo.setHasMoreInputs(true); + outLineageInfo.setHasMoreOutputs(true); + } + if (inLineageInfo.isInputRelationsReachedLimit() || outLineageInfo.isOutputRelationsReachedLimit() || isEntityTraversalLimitReached(entitiesTraversed)) { inLineageInfo.setHasMoreInputs(true); outLineageInfo.setHasMoreOutputs(true); @@ -725,10 +754,10 @@ private boolean setVerticalPaginationFlags(AtomicInteger entitiesTraversed, Line return hasRelationsLimitReached; } - private void setHorizontalPaginationFlags(boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, AtlasVertex inVertex, String inGuid, AtlasVertex outVertex, String outGuid, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, Set visitedVertices) { + private void setHorizontalPaginationFlags(boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int depth, AtomicInteger entitiesTraversed, AtlasVertex inVertex, AtlasVertex outVertex, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, Set visitedVertices, TimeoutTracker timeoutTracker) { boolean isOutVertexVisited = visitedVertices.contains(getId(outVertex)); boolean isInVertexVisited = visitedVertices.contains(getId(inVertex)); - if (depth == 1 || entitiesTraversed.get() == getLineageMaxNodeAllowedCount()-1) { // is the vertex a leaf? + if (depth == 1 || entitiesTraversed.get() == getLineageMaxNodeAllowedCount()-1 || timeoutTracker.hasTimedOut()) { // If traversal has to stop, set pagination flags if (isInput && ! isOutVertexVisited) setHasUpstream(atlasLineageOnDemandContext, outVertex, outLineageInfo); else if (!isInput && ! isInVertexVisited) diff --git a/repository/src/main/java/org/apache/atlas/discovery/TimeoutTracker.java b/repository/src/main/java/org/apache/atlas/discovery/TimeoutTracker.java new file mode 100644 index 0000000000..3f7fce2553 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/TimeoutTracker.java @@ -0,0 +1,17 @@ +package org.apache.atlas.discovery; + +public class TimeoutTracker { + + private final long startTime; + private final long timeoutMillis; + + public TimeoutTracker(long timeoutMillis) { + this.startTime = System.currentTimeMillis(); + this.timeoutMillis = timeoutMillis; + } + + public boolean hasTimedOut() { + return (System.currentTimeMillis() - startTime) > timeoutMillis; + } + +} From 750a0ab1f4ce7c763a2c0f2bcb0327acec56ced6 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Tue, 12 Nov 2024 13:42:33 +0530 Subject: [PATCH 08/27] build fix --- .../java/org/apache/atlas/discovery/EntityLineageService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 17758bded6..b0e3f63fe8 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -522,6 +522,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; } + if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); From b15906f7e4276ea167521af10b904ffa7e42f1c4 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Tue, 12 Nov 2024 16:52:19 +0530 Subject: [PATCH 09/27] fix --- .../java/org/apache/atlas/discovery/EntityLineageService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index a873105a6d..0c5850f241 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -543,6 +543,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); while (edges.hasNext()) { + AtlasEdge currentEdge = edges.next(); if (!lineageListContext.evaluateTraversalFilter(currentEdge)) continue; From 3a21da7046356b9639e7f71ad3a7a8b87f34e051 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Tue, 12 Nov 2024 17:13:42 +0530 Subject: [PATCH 10/27] ci fix --- .github/workflows/maven.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f8a09b5589..84d3ddfe43 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -77,7 +77,7 @@ jobs: shell: bash - name: Get version tag - run: echo "##[set-output name=version;]$(echo `git ls-remote https://${{ secrets.ORG_PAT_GITHUB }}@github.com/atlanhq/${REPOSITORY_NAME}.git ${{ steps.get_branch.outputs.branch }} | awk '{ print $1}' | cut -c1-7`)abcd" + run: echo "##[set-output name=version;]$(echo `git ls-remote https://${{ secrets.ORG_PAT_GITHUB }}@github.com/atlanhq/${REPOSITORY_NAME}.git refs/heads/${{ steps.get_branch.outputs.branch }} | awk '{ print $1}' | cut -c1-7`)abcd" id: get_version - name: Set up Buildx From 2aa74f085a9c56ca1343ac7e6aff92219d344c01 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Tue, 12 Nov 2024 18:50:09 +0530 Subject: [PATCH 11/27] Added 15s timeout --- .../org/apache/atlas/discovery/EntityLineageService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index b0e3f63fe8..918d3167b4 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -81,7 +81,7 @@ @Service public class EntityLineageService implements AtlasLineageService { private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class); - + private static final long LINEAGE_TRAVERSAL_TIMEOUT_MILLIS = 15000; private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; private static final String COLUMNS = "columns"; @@ -298,7 +298,7 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); AtomicInteger traversalOrder = new AtomicInteger(1); - TimeoutTracker tracker = new TimeoutTracker(50); // 20 seconds timeout + TimeoutTracker tracker = new TimeoutTracker(LINEAGE_TRAVERSAL_TIMEOUT_MILLIS); if (isDataSet) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) @@ -723,6 +723,7 @@ private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boole LineageInfoOnDemand inLineageInfo = ret.getRelationsOnDemand().containsKey(inGuid) ? ret.getRelationsOnDemand().get(inGuid) : new LineageInfoOnDemand(inGuidLineageConstraints); LineageInfoOnDemand outLineageInfo = ret.getRelationsOnDemand().containsKey(outGuid) ? ret.getRelationsOnDemand().get(outGuid) : new LineageInfoOnDemand(outGuidLineageConstraints); + setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, depth, entitiesTraversed, inVertex, outVertex, inLineageInfo, outLineageInfo, visitedVertices, timeoutTracker); boolean hasRelationsLimitReached = setVerticalPaginationFlags(entitiesTraversed, inLineageInfo, outLineageInfo, timeoutTracker); From 17b7ec97540a424720b71c57a50eba744ab6e90b Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Tue, 12 Nov 2024 22:22:53 +0530 Subject: [PATCH 12/27] Timeout - response clean up handling --- .../atlas/discovery/EntityLineageService.java | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 918d3167b4..7e5fd20992 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -387,17 +387,18 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - if (timeoutTracker.hasTimedOut()) { - executeCircuitBreaker(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - return; - } - if (checkForOffset(incomingEdge, datasetVertex, atlasLineageOnDemandContext, ret)) { continue; } - boolean isRelationsLimitReached = handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - if (isRelationsLimitReached) { + boolean stopProcessIteration = handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + // If timeout occurred or entity limit reached, set pagination flags again for consistency and stop + if (timeoutTracker.hasTimedOut()) { + handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + break; + } + + if (stopProcessIteration) { LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(baseGuid); if (entityOnDemandInfo == null) continue; @@ -424,13 +425,15 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { continue; } - boolean stopTraversal = handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - // If timeout occurred or entity limit reached, set pagination flags again for consistency and stop + boolean stopDatasetIteration = handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + // If timeout occurred or entity limit reached, set pagination flags again for consistency if (timeoutTracker.hasTimedOut()) { handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - break; + ret.setTraversalTimedOut(true); + return; } - if (stopTraversal) { + + if (stopDatasetIteration) { String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); @@ -447,6 +450,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i if (isEntityTraversalLimitReached(entitiesTraversed)) setEntityLimitReachedFlag(isInput, ret); + if (timeoutTracker.hasTimedOut()) { + handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + ret.setTraversalTimedOut(true); + return; + } } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, timeoutTracker); // execute inner depth From 0e531548d95511901828917fe459c93b317798ab Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Tue, 12 Nov 2024 22:38:03 +0530 Subject: [PATCH 13/27] Timeout - response clean up handling --- .../apache/atlas/discovery/EntityLineageService.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 7e5fd20992..a881539741 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -366,7 +366,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI } private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, TimeoutTracker timeoutTracker) throws AtlasBaseException { - if (isEntityTraversalLimitReached(entitiesTraversed)) + if (isEntityTraversalLimitReached(entitiesTraversed) || timeoutTracker.hasTimedOut()) return; if (depth != 0) { // base condition of recursion for depth AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand"); @@ -461,6 +461,10 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); if (traversedEntity != null) traversedEntity.setFinishTime(traversalOrder.get()); + if (timeoutTracker.hasTimedOut()) { + ret.setTraversalTimedOut(true); + return; + } } } } @@ -471,12 +475,6 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } - private void executeCircuitBreaker(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, Set visitedVertices, TimeoutTracker timeoutTracker) { - // Set timeout flags on the entity - handleHorizontalAndVerticalPagination(atlasEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - ret.setTraversalTimedOut(true); - } - private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDemandInfo ret) { if (isInput) ret.setUpstreamEntityLimitReached(true); From 34b4f6d7727d1bad638ca673f31529e927c10e3e Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 00:25:29 +0530 Subject: [PATCH 14/27] Timeout - response clean up handling --- intg/src/main/java/org/apache/atlas/AtlasConfiguration.java | 2 +- .../java/org/apache/atlas/discovery/EntityLineageService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 7fd081b9e3..5fe8a5e96f 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -94,7 +94,7 @@ public enum AtlasConfiguration { GRAPH_TRAVERSAL_PARALLELISM("atlas.graph.traverse.bucket.size",10), LINEAGE_ON_DEMAND_ENABLED("atlas.lineage.on.demand.enabled", true), LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT("atlas.lineage.on.demand.default.node.count", 3), - LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 100), + LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 1000), SUPPORTED_RELATIONSHIP_EVENTS("atlas.notification.relationships.filter", "asset_readme,asset_links"), diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index a881539741..58d2717272 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -81,7 +81,7 @@ @Service public class EntityLineageService implements AtlasLineageService { private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class); - private static final long LINEAGE_TRAVERSAL_TIMEOUT_MILLIS = 15000; + private static final long LINEAGE_TRAVERSAL_TIMEOUT_MILLIS = 4; private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; private static final String COLUMNS = "columns"; From f31e7ef78dedd9061b99e293407851b6c7712263 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 00:26:40 +0530 Subject: [PATCH 15/27] Timeout - response clean up handling --- .../java/org/apache/atlas/discovery/EntityLineageService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 58d2717272..66591daa9d 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -81,7 +81,7 @@ @Service public class EntityLineageService implements AtlasLineageService { private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class); - private static final long LINEAGE_TRAVERSAL_TIMEOUT_MILLIS = 4; + private static final long LINEAGE_TRAVERSAL_TIMEOUT_MILLIS = 4000; private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; private static final String COLUMNS = "columns"; From 3202acdc84e87180869847119bb994e369741dd1 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 02:33:43 +0530 Subject: [PATCH 16/27] fixes --- .../atlas/discovery/EntityLineageService.java | 63 ++++++++----------- 1 file changed, 27 insertions(+), 36 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 66591daa9d..c70edb0bde 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -347,7 +347,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI } boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); - if (handleHorizontalAndVerticalPagination(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, new HashSet<>(), timeoutTracker)) { + if (handleHorizontalAndVerticalPagination(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, new HashSet<>(), false)) { break; } else { addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); @@ -381,8 +381,14 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i while (incomingEdges.hasNext()) { AtlasEdge incomingEdge = incomingEdges.next(); - AtlasVertex processVertex = incomingEdge.getOutVertex(); + boolean isTimedOut = timeoutTracker.hasTimedOut(); + if (isTimedOut) { + handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); + ret.setTraversalTimedOut(true); + break; + } + AtlasVertex processVertex = incomingEdge.getOutVertex(); if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { continue; } @@ -391,13 +397,8 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - boolean stopProcessIteration = handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + boolean stopProcessIteration = handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); // If timeout occurred or entity limit reached, set pagination flags again for consistency and stop - if (timeoutTracker.hasTimedOut()) { - handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - break; - } - if (stopProcessIteration) { LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(baseGuid); if (entityOnDemandInfo == null) @@ -416,8 +417,14 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i while (outgoingEdges.hasNext()) { AtlasEdge outgoingEdge = outgoingEdges.next(); - AtlasVertex entityVertex = outgoingEdge.getInVertex(); + isTimedOut = timeoutTracker.hasTimedOut(); + if (isTimedOut) { + handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); + ret.setTraversalTimedOut(true); + break; + } + AtlasVertex entityVertex = outgoingEdge.getInVertex(); if (!vertexMatchesEvaluation(entityVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(outgoingEdge, atlasLineageOnDemandContext)) { continue; } @@ -425,14 +432,8 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { continue; } - boolean stopDatasetIteration = handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - // If timeout occurred or entity limit reached, set pagination flags again for consistency - if (timeoutTracker.hasTimedOut()) { - handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - ret.setTraversalTimedOut(true); - return; - } + boolean stopDatasetIteration = handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); if (stopDatasetIteration) { String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); @@ -450,21 +451,12 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i if (isEntityTraversalLimitReached(entitiesTraversed)) setEntityLimitReachedFlag(isInput, ret); - if (timeoutTracker.hasTimedOut()) { - handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - ret.setTraversalTimedOut(true); - return; - } } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, timeoutTracker); // execute inner depth AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); if (traversedEntity != null) traversedEntity.setFinishTime(traversalOrder.get()); - if (timeoutTracker.hasTimedOut()) { - ret.setTraversalTimedOut(true); - return; - } } } } @@ -715,7 +707,7 @@ private static String getId(AtlasVertex vertex) { return vertex.getIdForDisplay(); } - private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, Set visitedVertices, TimeoutTracker timeoutTracker) { + private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, Set visitedVertices, boolean isTimedOut) { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("incrementAndCheckIfRelationsLimitReached"); AtlasVertex inVertex = isInput ? atlasEdge.getOutVertex() : atlasEdge.getInVertex(); @@ -729,10 +721,9 @@ private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boole LineageInfoOnDemand inLineageInfo = ret.getRelationsOnDemand().containsKey(inGuid) ? ret.getRelationsOnDemand().get(inGuid) : new LineageInfoOnDemand(inGuidLineageConstraints); LineageInfoOnDemand outLineageInfo = ret.getRelationsOnDemand().containsKey(outGuid) ? ret.getRelationsOnDemand().get(outGuid) : new LineageInfoOnDemand(outGuidLineageConstraints); + setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, depth, entitiesTraversed, inVertex, outVertex, inLineageInfo, outLineageInfo, visitedVertices, isTimedOut); - setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, depth, entitiesTraversed, inVertex, outVertex, inLineageInfo, outLineageInfo, visitedVertices, timeoutTracker); - - boolean hasRelationsLimitReached = setVerticalPaginationFlags(entitiesTraversed, inLineageInfo, outLineageInfo, timeoutTracker); + boolean hasRelationsLimitReached = setVerticalPaginationFlags(inLineageInfo, outLineageInfo, isTimedOut); if (!hasRelationsLimitReached) { ret.getRelationsOnDemand().put(inGuid, inLineageInfo); ret.getRelationsOnDemand().put(outGuid, outLineageInfo); @@ -742,14 +733,14 @@ private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boole return hasRelationsLimitReached; } - private boolean setVerticalPaginationFlags(AtomicInteger entitiesTraversed, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, TimeoutTracker timeoutTracker) { - boolean hasRelationsLimitReached = false; - if (timeoutTracker.hasTimedOut()) { + private boolean setVerticalPaginationFlags(LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, boolean isTimedOut) { + if (isTimedOut) { inLineageInfo.setHasMoreInputs(true); outLineageInfo.setHasMoreOutputs(true); + return true; } - - if (inLineageInfo.isInputRelationsReachedLimit() || outLineageInfo.isOutputRelationsReachedLimit() || isEntityTraversalLimitReached(entitiesTraversed)) { + boolean hasRelationsLimitReached = false; + if (inLineageInfo.isInputRelationsReachedLimit() || outLineageInfo.isOutputRelationsReachedLimit()) { inLineageInfo.setHasMoreInputs(true); outLineageInfo.setHasMoreOutputs(true); hasRelationsLimitReached = true; @@ -762,10 +753,10 @@ private boolean setVerticalPaginationFlags(AtomicInteger entitiesTraversed, Line return hasRelationsLimitReached; } - private void setHorizontalPaginationFlags(boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int depth, AtomicInteger entitiesTraversed, AtlasVertex inVertex, AtlasVertex outVertex, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, Set visitedVertices, TimeoutTracker timeoutTracker) { + private void setHorizontalPaginationFlags(boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int depth, AtomicInteger entitiesTraversed, AtlasVertex inVertex, AtlasVertex outVertex, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, Set visitedVertices, boolean isTimedOut) { boolean isOutVertexVisited = visitedVertices.contains(getId(outVertex)); boolean isInVertexVisited = visitedVertices.contains(getId(inVertex)); - if (depth == 1 || entitiesTraversed.get() == getLineageMaxNodeAllowedCount()-1 || timeoutTracker.hasTimedOut()) { // If traversal has to stop, set pagination flags + if (depth == 1 || entitiesTraversed.get() == getLineageMaxNodeAllowedCount()-1 || isTimedOut) { // If traversal has to stop, set pagination flags if (isInput && ! isOutVertexVisited) setHasUpstream(atlasLineageOnDemandContext, outVertex, outLineageInfo); else if (!isInput && ! isInVertexVisited) From 558bf81cc6848c65dbcdb0172a90bbec09800fdd Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 02:38:17 +0530 Subject: [PATCH 17/27] fixes to timeout algo --- .../atlas/discovery/EntityLineageService.java | 63 ++++++++----------- 1 file changed, 27 insertions(+), 36 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index a881539741..16c3ec816a 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -347,7 +347,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI } boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); - if (handleHorizontalAndVerticalPagination(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, new HashSet<>(), timeoutTracker)) { + if (handleHorizontalAndVerticalPagination(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, new HashSet<>(), false)) { break; } else { addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); @@ -381,8 +381,14 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i while (incomingEdges.hasNext()) { AtlasEdge incomingEdge = incomingEdges.next(); - AtlasVertex processVertex = incomingEdge.getOutVertex(); + boolean isTimedOut = timeoutTracker.hasTimedOut(); + if (isTimedOut) { + handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); + ret.setTraversalTimedOut(true); + break; + } + AtlasVertex processVertex = incomingEdge.getOutVertex(); if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { continue; } @@ -391,13 +397,8 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - boolean stopProcessIteration = handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); + boolean stopProcessIteration = handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); // If timeout occurred or entity limit reached, set pagination flags again for consistency and stop - if (timeoutTracker.hasTimedOut()) { - handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - break; - } - if (stopProcessIteration) { LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(baseGuid); if (entityOnDemandInfo == null) @@ -416,8 +417,14 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i while (outgoingEdges.hasNext()) { AtlasEdge outgoingEdge = outgoingEdges.next(); - AtlasVertex entityVertex = outgoingEdge.getInVertex(); + isTimedOut = timeoutTracker.hasTimedOut(); + if (isTimedOut) { + handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); + ret.setTraversalTimedOut(true); + break; + } + AtlasVertex entityVertex = outgoingEdge.getInVertex(); if (!vertexMatchesEvaluation(entityVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(outgoingEdge, atlasLineageOnDemandContext)) { continue; } @@ -425,14 +432,8 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { continue; } - boolean stopDatasetIteration = handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - // If timeout occurred or entity limit reached, set pagination flags again for consistency - if (timeoutTracker.hasTimedOut()) { - handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - ret.setTraversalTimedOut(true); - return; - } + boolean stopDatasetIteration = handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); if (stopDatasetIteration) { String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); @@ -450,21 +451,12 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i if (isEntityTraversalLimitReached(entitiesTraversed)) setEntityLimitReachedFlag(isInput, ret); - if (timeoutTracker.hasTimedOut()) { - handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, timeoutTracker); - ret.setTraversalTimedOut(true); - return; - } } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, timeoutTracker); // execute inner depth AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); if (traversedEntity != null) traversedEntity.setFinishTime(traversalOrder.get()); - if (timeoutTracker.hasTimedOut()) { - ret.setTraversalTimedOut(true); - return; - } } } } @@ -715,7 +707,7 @@ private static String getId(AtlasVertex vertex) { return vertex.getIdForDisplay(); } - private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, Set visitedVertices, TimeoutTracker timeoutTracker) { + private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, Set visitedVertices, boolean isTimedOut) { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("incrementAndCheckIfRelationsLimitReached"); AtlasVertex inVertex = isInput ? atlasEdge.getOutVertex() : atlasEdge.getInVertex(); @@ -729,10 +721,9 @@ private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boole LineageInfoOnDemand inLineageInfo = ret.getRelationsOnDemand().containsKey(inGuid) ? ret.getRelationsOnDemand().get(inGuid) : new LineageInfoOnDemand(inGuidLineageConstraints); LineageInfoOnDemand outLineageInfo = ret.getRelationsOnDemand().containsKey(outGuid) ? ret.getRelationsOnDemand().get(outGuid) : new LineageInfoOnDemand(outGuidLineageConstraints); + setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, depth, entitiesTraversed, inVertex, outVertex, inLineageInfo, outLineageInfo, visitedVertices, isTimedOut); - setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, depth, entitiesTraversed, inVertex, outVertex, inLineageInfo, outLineageInfo, visitedVertices, timeoutTracker); - - boolean hasRelationsLimitReached = setVerticalPaginationFlags(entitiesTraversed, inLineageInfo, outLineageInfo, timeoutTracker); + boolean hasRelationsLimitReached = setVerticalPaginationFlags(inLineageInfo, outLineageInfo, isTimedOut); if (!hasRelationsLimitReached) { ret.getRelationsOnDemand().put(inGuid, inLineageInfo); ret.getRelationsOnDemand().put(outGuid, outLineageInfo); @@ -742,14 +733,14 @@ private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boole return hasRelationsLimitReached; } - private boolean setVerticalPaginationFlags(AtomicInteger entitiesTraversed, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, TimeoutTracker timeoutTracker) { - boolean hasRelationsLimitReached = false; - if (timeoutTracker.hasTimedOut()) { + private boolean setVerticalPaginationFlags(LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, boolean isTimedOut) { + if (isTimedOut) { inLineageInfo.setHasMoreInputs(true); outLineageInfo.setHasMoreOutputs(true); + return true; } - - if (inLineageInfo.isInputRelationsReachedLimit() || outLineageInfo.isOutputRelationsReachedLimit() || isEntityTraversalLimitReached(entitiesTraversed)) { + boolean hasRelationsLimitReached = false; + if (inLineageInfo.isInputRelationsReachedLimit() || outLineageInfo.isOutputRelationsReachedLimit()) { inLineageInfo.setHasMoreInputs(true); outLineageInfo.setHasMoreOutputs(true); hasRelationsLimitReached = true; @@ -762,10 +753,10 @@ private boolean setVerticalPaginationFlags(AtomicInteger entitiesTraversed, Line return hasRelationsLimitReached; } - private void setHorizontalPaginationFlags(boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int depth, AtomicInteger entitiesTraversed, AtlasVertex inVertex, AtlasVertex outVertex, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, Set visitedVertices, TimeoutTracker timeoutTracker) { + private void setHorizontalPaginationFlags(boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int depth, AtomicInteger entitiesTraversed, AtlasVertex inVertex, AtlasVertex outVertex, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, Set visitedVertices, boolean isTimedOut) { boolean isOutVertexVisited = visitedVertices.contains(getId(outVertex)); boolean isInVertexVisited = visitedVertices.contains(getId(inVertex)); - if (depth == 1 || entitiesTraversed.get() == getLineageMaxNodeAllowedCount()-1 || timeoutTracker.hasTimedOut()) { // If traversal has to stop, set pagination flags + if (depth == 1 || entitiesTraversed.get() == getLineageMaxNodeAllowedCount()-1 || isTimedOut) { // If traversal has to stop, set pagination flags if (isInput && ! isOutVertexVisited) setHasUpstream(atlasLineageOnDemandContext, outVertex, outLineageInfo); else if (!isInput && ! isInVertexVisited) From 505150a6df2a6763d81aa0148c340dba2c13006f Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 02:47:27 +0530 Subject: [PATCH 18/27] moved pagination logic before timeouts --- .../atlas/discovery/EntityLineageService.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 16c3ec816a..9ee18d61a4 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -381,6 +381,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i while (incomingEdges.hasNext()) { AtlasEdge incomingEdge = incomingEdges.next(); + + if (checkForOffset(incomingEdge, datasetVertex, atlasLineageOnDemandContext, ret)) { + continue; + } + boolean isTimedOut = timeoutTracker.hasTimedOut(); if (isTimedOut) { handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); @@ -393,10 +398,6 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - if (checkForOffset(incomingEdge, datasetVertex, atlasLineageOnDemandContext, ret)) { - continue; - } - boolean stopProcessIteration = handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); // If timeout occurred or entity limit reached, set pagination flags again for consistency and stop if (stopProcessIteration) { @@ -417,6 +418,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i while (outgoingEdges.hasNext()) { AtlasEdge outgoingEdge = outgoingEdges.next(); + + if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { + continue; + } + isTimedOut = timeoutTracker.hasTimedOut(); if (isTimedOut) { handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); @@ -429,10 +435,6 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { - continue; - } - boolean stopDatasetIteration = handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); if (stopDatasetIteration) { String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); From 0587eb1f5b64ba1d004950698c52535088245cb2 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 02:48:35 +0530 Subject: [PATCH 19/27] moved pagination logic before timeouts --- .../atlas/discovery/EntityLineageService.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index c70edb0bde..9872d8103c 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -381,6 +381,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i while (incomingEdges.hasNext()) { AtlasEdge incomingEdge = incomingEdges.next(); + + if (checkForOffset(incomingEdge, datasetVertex, atlasLineageOnDemandContext, ret)) { + continue; + } + boolean isTimedOut = timeoutTracker.hasTimedOut(); if (isTimedOut) { handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); @@ -393,10 +398,6 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - if (checkForOffset(incomingEdge, datasetVertex, atlasLineageOnDemandContext, ret)) { - continue; - } - boolean stopProcessIteration = handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); // If timeout occurred or entity limit reached, set pagination flags again for consistency and stop if (stopProcessIteration) { @@ -417,6 +418,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i while (outgoingEdges.hasNext()) { AtlasEdge outgoingEdge = outgoingEdges.next(); + + if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { + continue; + } + isTimedOut = timeoutTracker.hasTimedOut(); if (isTimedOut) { handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); @@ -429,10 +435,6 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { - continue; - } - boolean stopDatasetIteration = handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); if (stopDatasetIteration) { String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); From 2e12c7fb25b23a2a5e1c55156f3bdf7d39be97ac Mon Sep 17 00:00:00 2001 From: aarshi Date: Wed, 13 Nov 2024 11:28:39 +0530 Subject: [PATCH 20/27] Configure async indexsearch --- .../atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java | 2 +- intg/src/main/java/org/apache/atlas/AtlasConfiguration.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index 9aa7cfe8ba..f862b643c4 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -132,7 +132,7 @@ private DirectIndexQueryResult runQueryWithLowLevelClient(SearchParams searchPar DirectIndexQueryResult result = null; try { - if(searchParams.isCallAsync()) { + if(searchParams.isCallAsync() || AtlasConfiguration.ENABLE_ASYNC_INDEXSEARCH.getBoolean()) { return performAsyncDirectIndexQuery(searchParams); } else{ String responseString = performDirectIndexQuery(searchParams.getQuery(), false); diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 7fd081b9e3..df2bca7860 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -113,6 +113,7 @@ public enum AtlasConfiguration { HERACLES_API_SERVER_URL("atlas.heracles.api.service.url", "http://heracles-service.heracles.svc.cluster.local"), INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300), + ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", false), ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""), ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), From 7bba0eb8d17bc9ba5c7a4a5bf4e1b4d1e9ebc13c Mon Sep 17 00:00:00 2001 From: aarshi Date: Wed, 13 Nov 2024 12:07:11 +0530 Subject: [PATCH 21/27] enable async calls --- intg/src/main/java/org/apache/atlas/AtlasConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index df2bca7860..e4886f9812 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -113,7 +113,7 @@ public enum AtlasConfiguration { HERACLES_API_SERVER_URL("atlas.heracles.api.service.url", "http://heracles-service.heracles.svc.cluster.local"), INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300), - ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", false), + ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", true), ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""), ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), From 6cae188db004be84cd169cd42c3aff8677f0c94a Mon Sep 17 00:00:00 2001 From: hr2904 Date: Wed, 13 Nov 2024 16:23:14 +0530 Subject: [PATCH 22/27] ADded updateTime in delete flow for updatesBM --- .../atlas/repository/store/graph/v2/EntityGraphMapper.java | 1 + 1 file changed, 1 insertion(+) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 5152bb2e5d..76f0be44cb 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -877,6 +877,7 @@ public void removeBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType e } if (MapUtils.isNotEmpty(updatedBusinessAttributes)) { + updateModificationMetadata(entityVertex); entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes); } From d9b46dda4e73e45e67252742d717bec4ac8931b1 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 18:13:47 +0530 Subject: [PATCH 23/27] fixed traversal timeout --- .../org/apache/atlas/AtlasConfiguration.java | 3 +- .../lineage/AtlasLineageOnDemandInfo.java | 10 +- .../AtlasLineageOnDemandContext.java | 9 + .../atlas/discovery/EntityLineageService.java | 195 +++++++++--------- .../atlas/discovery/TimeoutChecker.java | 15 ++ .../atlas/discovery/TimeoutTracker.java | 17 -- 6 files changed, 128 insertions(+), 121 deletions(-) create mode 100644 repository/src/main/java/org/apache/atlas/discovery/TimeoutChecker.java delete mode 100644 repository/src/main/java/org/apache/atlas/discovery/TimeoutTracker.java diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 7fd081b9e3..25138e409a 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -94,7 +94,8 @@ public enum AtlasConfiguration { GRAPH_TRAVERSAL_PARALLELISM("atlas.graph.traverse.bucket.size",10), LINEAGE_ON_DEMAND_ENABLED("atlas.lineage.on.demand.enabled", true), LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT("atlas.lineage.on.demand.default.node.count", 3), - LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 100), + LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 50), + LINEAGE_TIMEOUT_MS("atlas.lineage.max.timeout.ms", 15000), SUPPORTED_RELATIONSHIP_EVENTS("atlas.notification.relationships.filter", "asset_readme,asset_links"), diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java index 9ddcfe8f2d..7d2a64c8c1 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java @@ -33,7 +33,7 @@ public class AtlasLineageOnDemandInfo implements Serializable { private LineageOnDemandRequest lineageOnDemandPayload; private boolean upstreamEntityLimitReached; private boolean downstreamEntityLimitReached; - private boolean traversalTimedOut; + private boolean timeoutOccurred; public AtlasLineageOnDemandInfo() { } @@ -136,12 +136,12 @@ public void setDownstreamEntityLimitReached(boolean downstreamEntityLimitReached this.downstreamEntityLimitReached = downstreamEntityLimitReached; } - public boolean isTraversalTimedOut() { - return traversalTimedOut; + public void setTimeoutOccurred(boolean timeoutOccurred) { + this.timeoutOccurred = timeoutOccurred; } - public void setTraversalTimedOut(boolean traversalTimedOut) { - this.traversalTimedOut = traversalTimedOut; + public boolean isTimeoutOccurred() { + return timeoutOccurred; } @Override diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java index 5509684855..d35a56b568 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java @@ -23,6 +23,7 @@ public class AtlasLineageOnDemandContext { private Set attributes; private Set relationAttributes; private LineageOnDemandBaseParams defaultParams; + private TimeoutChecker timeoutChecker; public AtlasLineageOnDemandContext(LineageOnDemandRequest lineageOnDemandRequest, AtlasTypeRegistry typeRegistry) { this.constraints = lineageOnDemandRequest.getConstraints(); @@ -81,6 +82,14 @@ public void setDefaultParams(LineageOnDemandBaseParams defaultParams) { this.defaultParams = defaultParams; } + public TimeoutChecker getTimeoutChecker() { + return timeoutChecker; + } + + public void setTimeoutChecker(TimeoutChecker timeoutChecker) { + this.timeoutChecker = timeoutChecker; + } + protected Predicate constructInMemoryPredicate(AtlasTypeRegistry typeRegistry, SearchParameters.FilterCriteria filterCriteria) { LineageSearchProcessor lineageSearchProcessor = new LineageSearchProcessor(); return lineageSearchProcessor.constructInMemoryPredicate(typeRegistry, filterCriteria); diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 9ee18d61a4..9dd862b333 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -81,13 +81,14 @@ @Service public class EntityLineageService implements AtlasLineageService { private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class); - private static final long LINEAGE_TRAVERSAL_TIMEOUT_MILLIS = 15000; + private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; private static final String COLUMNS = "columns"; private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000; private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3; + private static final long LINEAGE_TIMEOUT_MS = AtlasConfiguration.LINEAGE_TIMEOUT_MS.getLong(); private static final String SEPARATOR = "->"; private final AtlasGraph graph; @@ -298,13 +299,14 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); AtomicInteger traversalOrder = new AtomicInteger(1); - TimeoutTracker tracker = new TimeoutTracker(LINEAGE_TRAVERSAL_TIMEOUT_MILLIS); + TimeoutChecker timeoutChecker = new TimeoutChecker(LINEAGE_TIMEOUT_MS); + atlasLineageOnDemandContext.setTimeoutChecker(timeoutChecker); if (isDataSet) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, tracker); + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, tracker); + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); ret.getGuidEntityMap().put(guid, baseEntityHeader); @@ -313,11 +315,11 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, tracker); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, tracker); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); } } RequestContext.get().endMetricRecord(metricRecorder); @@ -330,7 +332,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal baseEntityHeader.setFinishTime(traversalOrder.get()); } - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, TimeoutTracker timeoutTracker) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; int nextLevel = isInput ? level - 1: level + 1; @@ -347,7 +349,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI } boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); - if (handleHorizontalAndVerticalPagination(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, new HashSet<>(), false)) { + if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, new HashSet<>())) { break; } else { addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); @@ -361,47 +363,86 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, timeoutTracker); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, TimeoutTracker timeoutTracker) throws AtlasBaseException { - if (isEntityTraversalLimitReached(entitiesTraversed) || timeoutTracker.hasTimedOut()) + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + // Get timeout checker from context or create new one + TimeoutChecker timeoutChecker = atlasLineageOnDemandContext.getTimeoutChecker(); + // Check timeout before starting traversal + if (timeoutChecker.hasTimedOut()) { + handleTimeout(ret); + return; + } + + if (isEntityTraversalLimitReached(entitiesTraversed)) return; if (depth != 0) { // base condition of recursion for depth AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand"); - try { - int nextLevel = isInput ? level - 1 : level + 1; - // keep track of visited vertices to avoid circular loop - visitedVertices.add(getId(datasetVertex)); + AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; + int nextLevel = isInput ? level - 1: level + 1; + // keep track of visited vertices to avoid circular loop + visitedVertices.add(getId(datasetVertex)); + + AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); + Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); + + while (incomingEdges.hasNext()) { + // Check timeout periodically + if (timeoutChecker.hasTimedOut()) { + handleTimeout(ret); + return; + } - AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); - RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); + AtlasEdge incomingEdge = incomingEdges.next(); + AtlasVertex processVertex = incomingEdge.getOutVertex(); - while (incomingEdges.hasNext()) { - AtlasEdge incomingEdge = incomingEdges.next(); + if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { + continue; + } - if (checkForOffset(incomingEdge, datasetVertex, atlasLineageOnDemandContext, ret)) { - continue; - } + if (checkForOffset(incomingEdge, datasetVertex, atlasLineageOnDemandContext, ret)) { + continue; + } - boolean isTimedOut = timeoutTracker.hasTimedOut(); - if (isTimedOut) { - handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); - ret.setTraversalTimedOut(true); + if (incrementAndCheckIfRelationsLimitReached(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) { + LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(baseGuid); + if (entityOnDemandInfo == null) + continue; + if (isInput ? entityOnDemandInfo.isInputRelationsReachedLimit() : entityOnDemandInfo.isOutputRelationsReachedLimit()) break; + else + continue; + } else { + addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level, traversalOrder); + } + + AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); + Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); + + while (outgoingEdges.hasNext()) { + // Check timeout in inner loop as well + if (timeoutChecker.hasTimedOut()) { + handleTimeout(ret); + return; } - AtlasVertex processVertex = incomingEdge.getOutVertex(); - if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { + AtlasEdge outgoingEdge = outgoingEdges.next(); + AtlasVertex entityVertex = outgoingEdge.getInVertex(); + + if (!vertexMatchesEvaluation(entityVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(outgoingEdge, atlasLineageOnDemandContext)) { continue; } - boolean stopProcessIteration = handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); - // If timeout occurred or entity limit reached, set pagination flags again for consistency and stop - if (stopProcessIteration) { - LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(baseGuid); + if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { + continue; + } + if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) { + String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); + LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); if (entityOnDemandInfo == null) continue; if (isInput ? entityOnDemandInfo.isInputRelationsReachedLimit() : entityOnDemandInfo.isOutputRelationsReachedLimit()) @@ -409,65 +450,29 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level, traversalOrder); + addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); + entitiesTraversed.incrementAndGet(); + traversalOrder.incrementAndGet(); + if (isEntityTraversalLimitReached(entitiesTraversed)) + setEntityLimitReachedFlag(isInput, ret); } - - AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); - RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); - - while (outgoingEdges.hasNext()) { - AtlasEdge outgoingEdge = outgoingEdges.next(); - - if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { - continue; - } - - isTimedOut = timeoutTracker.hasTimedOut(); - if (isTimedOut) { - handleHorizontalAndVerticalPagination(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); - ret.setTraversalTimedOut(true); - break; - } - - AtlasVertex entityVertex = outgoingEdge.getInVertex(); - if (!vertexMatchesEvaluation(entityVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(outgoingEdge, atlasLineageOnDemandContext)) { - continue; - } - - boolean stopDatasetIteration = handleHorizontalAndVerticalPagination(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, visitedVertices, isTimedOut); - if (stopDatasetIteration) { - String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); - LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); - - if (entityOnDemandInfo == null) - continue; - if (isInput ? entityOnDemandInfo.isInputRelationsReachedLimit() : entityOnDemandInfo.isOutputRelationsReachedLimit()) - break; - else - continue; - } else { - addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); - entitiesTraversed.incrementAndGet(); - traversalOrder.incrementAndGet(); - - if (isEntityTraversalLimitReached(entitiesTraversed)) - setEntityLimitReachedFlag(isInput, ret); - } - if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, timeoutTracker); // execute inner depth - AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); - if (traversedEntity != null) - traversedEntity.setFinishTime(traversalOrder.get()); - } + if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth + AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); + if (traversedEntity != null) + traversedEntity.setFinishTime(traversalOrder.get()); } } - } finally { - RequestContext.get().endMetricRecord(metricRecorder); } + + RequestContext.get().endMetricRecord(metricRecorder); } } + private void handleTimeout(AtlasLineageOnDemandInfo ret) { + ret.setTimeoutOccurred(true); + LOG.warn("Lineage traversal timed out after {} ms", LINEAGE_TIMEOUT_MS); + } private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDemandInfo ret) { if (isInput) @@ -522,7 +527,6 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; } - if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); @@ -709,7 +713,7 @@ private static String getId(AtlasVertex vertex) { return vertex.getIdForDisplay(); } - private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, Set visitedVertices, boolean isTimedOut) { + private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, AtlasLineageOnDemandInfo.LineageDirection direction, Set visitedVertices) { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("incrementAndCheckIfRelationsLimitReached"); AtlasVertex inVertex = isInput ? atlasEdge.getOutVertex() : atlasEdge.getInVertex(); @@ -723,9 +727,9 @@ private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boole LineageInfoOnDemand inLineageInfo = ret.getRelationsOnDemand().containsKey(inGuid) ? ret.getRelationsOnDemand().get(inGuid) : new LineageInfoOnDemand(inGuidLineageConstraints); LineageInfoOnDemand outLineageInfo = ret.getRelationsOnDemand().containsKey(outGuid) ? ret.getRelationsOnDemand().get(outGuid) : new LineageInfoOnDemand(outGuidLineageConstraints); - setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, depth, entitiesTraversed, inVertex, outVertex, inLineageInfo, outLineageInfo, visitedVertices, isTimedOut); + setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, inVertex, inGuid, outVertex, outGuid, inLineageInfo, outLineageInfo, visitedVertices); - boolean hasRelationsLimitReached = setVerticalPaginationFlags(inLineageInfo, outLineageInfo, isTimedOut); + boolean hasRelationsLimitReached = setVerticalPaginationFlags(entitiesTraversed, inLineageInfo, outLineageInfo); if (!hasRelationsLimitReached) { ret.getRelationsOnDemand().put(inGuid, inLineageInfo); ret.getRelationsOnDemand().put(outGuid, outLineageInfo); @@ -735,14 +739,9 @@ private boolean handleHorizontalAndVerticalPagination(AtlasEdge atlasEdge, boole return hasRelationsLimitReached; } - private boolean setVerticalPaginationFlags(LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, boolean isTimedOut) { - if (isTimedOut) { - inLineageInfo.setHasMoreInputs(true); - outLineageInfo.setHasMoreOutputs(true); - return true; - } + private boolean setVerticalPaginationFlags(AtomicInteger entitiesTraversed, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo) { boolean hasRelationsLimitReached = false; - if (inLineageInfo.isInputRelationsReachedLimit() || outLineageInfo.isOutputRelationsReachedLimit()) { + if (inLineageInfo.isInputRelationsReachedLimit() || outLineageInfo.isOutputRelationsReachedLimit() || isEntityTraversalLimitReached(entitiesTraversed)) { inLineageInfo.setHasMoreInputs(true); outLineageInfo.setHasMoreOutputs(true); hasRelationsLimitReached = true; @@ -755,10 +754,10 @@ private boolean setVerticalPaginationFlags(LineageInfoOnDemand inLineageInfo, Li return hasRelationsLimitReached; } - private void setHorizontalPaginationFlags(boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int depth, AtomicInteger entitiesTraversed, AtlasVertex inVertex, AtlasVertex outVertex, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, Set visitedVertices, boolean isTimedOut) { + private void setHorizontalPaginationFlags(boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, AtlasVertex inVertex, String inGuid, AtlasVertex outVertex, String outGuid, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, Set visitedVertices) { boolean isOutVertexVisited = visitedVertices.contains(getId(outVertex)); boolean isInVertexVisited = visitedVertices.contains(getId(inVertex)); - if (depth == 1 || entitiesTraversed.get() == getLineageMaxNodeAllowedCount()-1 || isTimedOut) { // If traversal has to stop, set pagination flags + if (depth == 1 || entitiesTraversed.get() == getLineageMaxNodeAllowedCount()-1) { // is the vertex a leaf? if (isInput && ! isOutVertexVisited) setHasUpstream(atlasLineageOnDemandContext, outVertex, outLineageInfo); else if (!isInput && ! isInVertexVisited) diff --git a/repository/src/main/java/org/apache/atlas/discovery/TimeoutChecker.java b/repository/src/main/java/org/apache/atlas/discovery/TimeoutChecker.java new file mode 100644 index 0000000000..74147d9d1a --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/TimeoutChecker.java @@ -0,0 +1,15 @@ +package org.apache.atlas.discovery; + +public class TimeoutChecker { + private final long startTime; + private final long timeoutMs; + + public TimeoutChecker(long timeoutMs) { + this.startTime = System.currentTimeMillis(); + this.timeoutMs = timeoutMs; + } + + public boolean hasTimedOut() { + return System.currentTimeMillis() - startTime > timeoutMs; + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/discovery/TimeoutTracker.java b/repository/src/main/java/org/apache/atlas/discovery/TimeoutTracker.java deleted file mode 100644 index 3f7fce2553..0000000000 --- a/repository/src/main/java/org/apache/atlas/discovery/TimeoutTracker.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.apache.atlas.discovery; - -public class TimeoutTracker { - - private final long startTime; - private final long timeoutMillis; - - public TimeoutTracker(long timeoutMillis) { - this.startTime = System.currentTimeMillis(); - this.timeoutMillis = timeoutMillis; - } - - public boolean hasTimedOut() { - return (System.currentTimeMillis() - startTime) > timeoutMillis; - } - -} From ad3edbc68d2400b0eb60c8a2cf38ff3f5af7bd47 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 18:14:25 +0530 Subject: [PATCH 24/27] fixed traversal timeout --- intg/src/main/java/org/apache/atlas/AtlasConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 25138e409a..4ed55c0ed0 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -94,7 +94,7 @@ public enum AtlasConfiguration { GRAPH_TRAVERSAL_PARALLELISM("atlas.graph.traverse.bucket.size",10), LINEAGE_ON_DEMAND_ENABLED("atlas.lineage.on.demand.enabled", true), LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT("atlas.lineage.on.demand.default.node.count", 3), - LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 50), + LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 100), LINEAGE_TIMEOUT_MS("atlas.lineage.max.timeout.ms", 15000), SUPPORTED_RELATIONSHIP_EVENTS("atlas.notification.relationships.filter", "asset_readme,asset_links"), From 1df7000731d9f06c699d5e4b8f8400a80cc1dd4c Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 19:17:23 +0530 Subject: [PATCH 25/27] resolved conflicts --- intg/src/main/java/org/apache/atlas/AtlasConfiguration.java | 1 + 1 file changed, 1 insertion(+) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 4ed55c0ed0..6f425c3958 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -114,6 +114,7 @@ public enum AtlasConfiguration { HERACLES_API_SERVER_URL("atlas.heracles.api.service.url", "http://heracles-service.heracles.svc.cluster.local"), INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300), + ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", true), ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""), ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), From ee0eacf86c7ac903e655fd0c8ed320a6b10502da Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 19:52:04 +0530 Subject: [PATCH 26/27] synced with master --- .github/workflows/maven.yml | 6 +++--- .../repository/graphdb/janus/AtlasElasticsearchQuery.java | 2 +- .../atlas/repository/graphdb/janus/SearchContextCache.java | 6 +++++- intg/src/main/java/org/apache/atlas/AtlasConfiguration.java | 1 - .../atlas/repository/store/graph/v2/EntityGraphMapper.java | 3 --- webapp/src/main/webapp/WEB-INF/web.xml | 1 - 6 files changed, 9 insertions(+), 10 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index afec7cf4e5..f8a09b5589 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -25,7 +25,7 @@ on: - beta - development - master - - dg1908 + - lineageondemand jobs: build: @@ -64,7 +64,7 @@ jobs: - name: Build with Maven run: | branch_name=${{ steps.get_branch.outputs.branch }} - if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'dg1908' ]] + if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]] then echo "build without dashboard" chmod +x ./build.sh && ./build.sh build_without_dashboard @@ -77,7 +77,7 @@ jobs: shell: bash - name: Get version tag - run: echo "##[set-output name=version;]$(echo `git ls-remote https://${{ secrets.ORG_PAT_GITHUB }}@github.com/atlanhq/${REPOSITORY_NAME}.git refs/heads/${{ steps.get_branch.outputs.branch }} | awk '{ print $1}' | cut -c1-7`)abcd" + run: echo "##[set-output name=version;]$(echo `git ls-remote https://${{ secrets.ORG_PAT_GITHUB }}@github.com/atlanhq/${REPOSITORY_NAME}.git ${{ steps.get_branch.outputs.branch }} | awk '{ print $1}' | cut -c1-7`)abcd" id: get_version - name: Set up Buildx diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index f862b643c4..9aa7cfe8ba 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -132,7 +132,7 @@ private DirectIndexQueryResult runQueryWithLowLevelClient(SearchParams searchPar DirectIndexQueryResult result = null; try { - if(searchParams.isCallAsync() || AtlasConfiguration.ENABLE_ASYNC_INDEXSEARCH.getBoolean()) { + if(searchParams.isCallAsync()) { return performAsyncDirectIndexQuery(searchParams); } else{ String responseString = performDirectIndexQuery(searchParams.getQuery(), false); diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java index f7e5718f19..e7fc2c11ec 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java @@ -6,6 +6,9 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; + +import javax.inject.Inject; + @Component public class SearchContextCache { private static final Logger LOG = LoggerFactory.getLogger(SearchContextCache.class); @@ -14,7 +17,8 @@ public class SearchContextCache { public static final String INVALID_SEQUENCE = "invalid_sequence"; - public SearchContextCache(@Qualifier("redisServiceImpl") RedisService redisService) { + @Inject + public SearchContextCache(RedisService redisService) { SearchContextCache.redisService = redisService; } diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 6f425c3958..4ed55c0ed0 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -114,7 +114,6 @@ public enum AtlasConfiguration { HERACLES_API_SERVER_URL("atlas.heracles.api.service.url", "http://heracles-service.heracles.svc.cluster.local"), INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300), - ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", true), ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""), ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 76f0be44cb..0b5eb09fca 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -739,7 +739,6 @@ public void setBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType enti } if (MapUtils.isNotEmpty(updatedBusinessAttributes)) { - updateModificationMetadata(entityVertex); entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes); } @@ -824,7 +823,6 @@ public void addOrUpdateBusinessAttributes(AtlasVertex entityVertex, AtlasEntityT } if (MapUtils.isNotEmpty(updatedBusinessAttributes)) { - updateModificationMetadata(entityVertex); entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes); } @@ -877,7 +875,6 @@ public void removeBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType e } if (MapUtils.isNotEmpty(updatedBusinessAttributes)) { - updateModificationMetadata(entityVertex); entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes); } diff --git a/webapp/src/main/webapp/WEB-INF/web.xml b/webapp/src/main/webapp/WEB-INF/web.xml index 590901b279..07092d62eb 100755 --- a/webapp/src/main/webapp/WEB-INF/web.xml +++ b/webapp/src/main/webapp/WEB-INF/web.xml @@ -129,7 +129,6 @@ ATLASSESSIONID true - true From 3b154dedf9aef436d42585c4fc149c8b30556854 Mon Sep 17 00:00:00 2001 From: suraj5077 Date: Wed, 13 Nov 2024 19:55:26 +0530 Subject: [PATCH 27/27] synced with master --- .../atlas/repository/graphdb/janus/SearchContextCache.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java index e7fc2c11ec..f7e5718f19 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java @@ -6,9 +6,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; - -import javax.inject.Inject; - @Component public class SearchContextCache { private static final Logger LOG = LoggerFactory.getLogger(SearchContextCache.class); @@ -17,8 +14,7 @@ public class SearchContextCache { public static final String INVALID_SEQUENCE = "invalid_sequence"; - @Inject - public SearchContextCache(RedisService redisService) { + public SearchContextCache(@Qualifier("redisServiceImpl") RedisService redisService) { SearchContextCache.redisService = redisService; }