Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lin 1309 lineage timeout #3718

Open
wants to merge 42 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
62280f7
Added logic for updating modificationTimestamp and modificationBy for…
hr2904 Nov 7, 2024
2ddf8fe
Added logic for updating modificationTimestamp and modificationBy for…
hr2904 Nov 7, 2024
f227a68
Added secure in ATLASSESSIONID Cookie.
hr2904 Nov 11, 2024
a6577ce
feat: set utm tag as null to update it later
sumandas0 Nov 11, 2024
da75140
feat: add configurable abuse protection
sumandas0 Nov 11, 2024
d174ad0
feat: make it empty
sumandas0 Nov 11, 2024
6ccc33c
Merge pull request #3713 from atlanhq/plt-2751-idx-limit
sumandas0 Nov 11, 2024
a7d650a
fix: Added 20s timeout for lineage traversal
suraj5077 Nov 11, 2024
89a0b42
Merge pull request #3717 from atlanhq/lin-1309-lineage-timeout
suraj5077 Nov 12, 2024
750a0ab
build fix
suraj5077 Nov 12, 2024
0663deb
Merge pull request #3719 from atlanhq/lin-1309-lineage-timeout
suraj5077 Nov 12, 2024
b15906f
fix
suraj5077 Nov 12, 2024
3a21da7
ci fix
suraj5077 Nov 12, 2024
7bd7f50
Merge pull request #3721 from atlanhq/feat/test-ci
suraj5077 Nov 12, 2024
2aa74f0
Added 15s timeout
suraj5077 Nov 12, 2024
af8c07a
Merge pull request #3722 from atlanhq/lin-1309-lineage-timeout
suraj5077 Nov 12, 2024
17b7ec9
Timeout - response clean up handling
suraj5077 Nov 12, 2024
2ee3ddf
Merge pull request #3723 from atlanhq/lin-1309-lineage-timeout
suraj5077 Nov 12, 2024
0e53154
Timeout - response clean up handling
suraj5077 Nov 12, 2024
39cd837
Merge pull request #3724 from atlanhq/lin-1309-lineage-timeout
suraj5077 Nov 12, 2024
34b4f6d
Timeout - response clean up handling
suraj5077 Nov 12, 2024
f31e7ef
Timeout - response clean up handling
suraj5077 Nov 12, 2024
9789674
Merge pull request #3725 from atlanhq/temp-lin-1309-lineage-timeout
suraj5077 Nov 12, 2024
3202acd
fixes
suraj5077 Nov 12, 2024
d3edfc8
Merge pull request #3726 from atlanhq/temp-lin-1309-lineage-timeout
suraj5077 Nov 12, 2024
558bf81
fixes to timeout algo
suraj5077 Nov 12, 2024
505150a
moved pagination logic before timeouts
suraj5077 Nov 12, 2024
0587eb1
moved pagination logic before timeouts
suraj5077 Nov 12, 2024
b38f195
Merge pull request #3727 from atlanhq/temp-lin-1309-lineage-timeout
suraj5077 Nov 12, 2024
2e12c7f
Configure async indexsearch
aarshi0301 Nov 13, 2024
7bba0eb
enable async calls
aarshi0301 Nov 13, 2024
a2c9133
Merge pull request #3728 from atlanhq/task/asyncIndexSearch
aarshi0301 Nov 13, 2024
57eff30
Merge pull request #3729 from atlanhq/dg1908
hr2904 Nov 13, 2024
9c338a9
Merge pull request #3730 from atlanhq/DG-1894
hr2904 Nov 13, 2024
6cae188
ADded updateTime in delete flow for updatesBM
hr2904 Nov 13, 2024
7db67a5
Merge pull request #3731 from atlanhq/DG-1894
hr2904 Nov 13, 2024
d9b46dd
fixed traversal timeout
suraj5077 Nov 13, 2024
ad3edbc
fixed traversal timeout
suraj5077 Nov 13, 2024
29b4617
resolved conflicts
suraj5077 Nov 13, 2024
1df7000
resolved conflicts
suraj5077 Nov 13, 2024
ee0eacf
synced with master
suraj5077 Nov 13, 2024
3b154de
synced with master
suraj5077 Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public enum AtlasConfiguration {
LINEAGE_ON_DEMAND_ENABLED("atlas.lineage.on.demand.enabled", true),
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT("atlas.lineage.on.demand.default.node.count", 3),
LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 100),
LINEAGE_TIMEOUT_MS("atlas.lineage.max.timeout.ms", 15000),

SUPPORTED_RELATIONSHIP_EVENTS("atlas.notification.relationships.filter", "asset_readme,asset_links"),

Expand All @@ -114,7 +115,8 @@ public enum AtlasConfiguration {

INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300),
ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000),
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", "project_sdk_python"),
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),

ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class AtlasLineageOnDemandInfo implements Serializable {
private LineageOnDemandRequest lineageOnDemandPayload;
private boolean upstreamEntityLimitReached;
private boolean downstreamEntityLimitReached;
private boolean timeoutOccurred;

public AtlasLineageOnDemandInfo() {
}
Expand Down Expand Up @@ -135,6 +136,14 @@ public void setDownstreamEntityLimitReached(boolean downstreamEntityLimitReached
this.downstreamEntityLimitReached = downstreamEntityLimitReached;
}

public void setTimeoutOccurred(boolean timeoutOccurred) {
this.timeoutOccurred = timeoutOccurred;
}

public boolean isTimeoutOccurred() {
return timeoutOccurred;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class AtlasLineageOnDemandContext {
private Set<String> attributes;
private Set<String> relationAttributes;
private LineageOnDemandBaseParams defaultParams;
private TimeoutChecker timeoutChecker;

public AtlasLineageOnDemandContext(LineageOnDemandRequest lineageOnDemandRequest, AtlasTypeRegistry typeRegistry) {
this.constraints = lineageOnDemandRequest.getConstraints();
Expand Down Expand Up @@ -81,6 +82,14 @@ public void setDefaultParams(LineageOnDemandBaseParams defaultParams) {
this.defaultParams = defaultParams;
}

public TimeoutChecker getTimeoutChecker() {
return timeoutChecker;
}

public void setTimeoutChecker(TimeoutChecker timeoutChecker) {
this.timeoutChecker = timeoutChecker;
}

protected Predicate constructInMemoryPredicate(AtlasTypeRegistry typeRegistry, SearchParameters.FilterCriteria filterCriteria) {
LineageSearchProcessor lineageSearchProcessor = new LineageSearchProcessor();
return lineageSearchProcessor.constructInMemoryPredicate(typeRegistry, filterCriteria);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class EntityLineageService implements AtlasLineageService {
private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000;
private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3;
private static final long LINEAGE_TIMEOUT_MS = AtlasConfiguration.LINEAGE_TIMEOUT_MS.getLong();
private static final String SEPARATOR = "->";

private final AtlasGraph graph;
Expand Down Expand Up @@ -298,6 +299,8 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag
AtomicInteger inputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger outputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger traversalOrder = new AtomicInteger(1);
TimeoutChecker timeoutChecker = new TimeoutChecker(LINEAGE_TIMEOUT_MS);
atlasLineageOnDemandContext.setTimeoutChecker(timeoutChecker);
if (isDataSet) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
Expand Down Expand Up @@ -365,6 +368,14 @@ private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isI
}

private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
// Get timeout checker from context or create new one
TimeoutChecker timeoutChecker = atlasLineageOnDemandContext.getTimeoutChecker();
// Check timeout before starting traversal
if (timeoutChecker.hasTimedOut()) {
handleTimeout(ret);
return;
}

if (isEntityTraversalLimitReached(entitiesTraversed))
return;
if (depth != 0) { // base condition of recursion for depth
Expand All @@ -379,6 +390,12 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn);

while (incomingEdges.hasNext()) {
// Check timeout periodically
if (timeoutChecker.hasTimedOut()) {
handleTimeout(ret);
return;
}

AtlasEdge incomingEdge = incomingEdges.next();
AtlasVertex processVertex = incomingEdge.getOutVertex();

Expand Down Expand Up @@ -407,6 +424,12 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut);

while (outgoingEdges.hasNext()) {
// Check timeout in inner loop as well
if (timeoutChecker.hasTimedOut()) {
handleTimeout(ret);
return;
}

AtlasEdge outgoingEdge = outgoingEdges.next();
AtlasVertex entityVertex = outgoingEdge.getInVertex();

Expand Down Expand Up @@ -436,7 +459,8 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) {
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth
AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex));
traversedEntity.setFinishTime(traversalOrder.get());
if (traversedEntity != null)
traversedEntity.setFinishTime(traversalOrder.get());
}
}
}
Expand All @@ -445,6 +469,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
}
}

private void handleTimeout(AtlasLineageOnDemandInfo ret) {
ret.setTimeoutOccurred(true);
LOG.warn("Lineage traversal timed out after {} ms", LINEAGE_TIMEOUT_MS);
}

private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDemandInfo ret) {
if (isInput)
ret.setUpstreamEntityLimitReached(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.atlas.discovery;

public class TimeoutChecker {
private final long startTime;
private final long timeoutMs;

public TimeoutChecker(long timeoutMs) {
this.startTime = System.currentTimeMillis();
this.timeoutMs = timeoutMs;
}

public boolean hasTimedOut() {
return System.currentTimeMillis() - startTime > timeoutMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public AtlasSearchResult indexSearch(@Context HttpServletRequest servletRequest,
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.indexSearch(" + parameters + ")");
}

if (parameters.getQuerySize() > AtlasConfiguration.ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT.getLong()) {
if (AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_API_LIMIT.getBoolean() && parameters.getQuerySize() > AtlasConfiguration.ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT.getLong()) {
if(CollectionUtils.isEmpty(parameters.getUtmTags())) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_DSL_QUERY_SIZE, String.valueOf(AtlasConfiguration.ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT.getLong()));
}
Expand Down
Loading