Skip to content

Commit

Permalink
RANGER-4609:Support in File-based Tag Retriever to provide tag-deltas
Browse files Browse the repository at this point in the history
  • Loading branch information
kulkabhay committed Dec 15, 2023
1 parent 1fafc63 commit 4ecb2f8
Showing 1 changed file with 133 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.io.*;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class RangerFileBasedTagRetriever extends RangerTagRetriever {
Expand All @@ -40,7 +40,9 @@ public class RangerFileBasedTagRetriever extends RangerTagRetriever {
private String serviceTagsFileName;
private Gson gsonBuilder;
private boolean deDupTags;

int tagFilesCount = 0;
int currentTagFileIndex = 0;
boolean isInitial = true;
@Override
public void init(Map<String, String> options) {

Expand All @@ -55,63 +57,32 @@ public void init(Map<String, String> options) {
String serviceTagsFileNameProperty = "serviceTagsFileName";
String serviceTagsDefaultFileName = "/testdata/test_servicetags_hive.json";
String deDupTagsProperty = "deDupTags";
String tagFilesCountProperty = "tagFileCount";

if (StringUtils.isNotBlank(serviceName) && serviceDef != null && StringUtils.isNotBlank(appId)) {
InputStream serviceTagsFileStream = null;


// Open specified file from options- it should contain service-tags

serviceTagsFileName = options != null? options.get(serviceTagsFileNameProperty) : null;
String deDupTagsVal = options != null? options.get(deDupTagsProperty) : "false";
deDupTags = Boolean.parseBoolean(deDupTagsVal);

serviceTagsFileName = serviceTagsFileName == null ? serviceTagsDefaultFileName : serviceTagsFileName;

File f = new File(serviceTagsFileName);

if (f.exists() && f.isFile() && f.canRead()) {
try {
serviceTagsFileStream = new FileInputStream(f);
serviceTagsFileURL = f.toURI().toURL();
} catch (FileNotFoundException exception) {
LOG.error("Error processing input file:" + serviceTagsFileName + " or no privilege for reading file " + serviceTagsFileName, exception);
} catch (MalformedURLException malformedException) {
LOG.error("Error processing input file:" + serviceTagsFileName + " cannot be converted to URL " + serviceTagsFileName, malformedException);
}
} else {
URL fileURL = getClass().getResource(serviceTagsFileName);
if (fileURL == null && !serviceTagsFileName.startsWith("/")) {
fileURL = getClass().getResource("/" + serviceTagsFileName);
}

if (fileURL == null) {
fileURL = ClassLoader.getSystemClassLoader().getResource(serviceTagsFileName);
if (fileURL == null && !serviceTagsFileName.startsWith("/")) {
fileURL = ClassLoader.getSystemClassLoader().getResource("/" + serviceTagsFileName);
}
}

if (fileURL != null) {
if (options != null) {
String tagFilesCountStr = options.get(tagFilesCountProperty);
if (!StringUtils.isNotEmpty(tagFilesCountStr)) {
try {
serviceTagsFileStream = fileURL.openStream();
serviceTagsFileURL = fileURL;
} catch (Exception exception) {
LOG.error(serviceTagsFileName + " is not a file", exception);
tagFilesCount = Integer.parseInt(tagFilesCountStr);
} catch (Exception e) {
LOG.error("Exception while parsing tagFileCount option value:[" + tagFilesCountStr + "]");
LOG.error("Setting tagFilesCount to 0");
}
} else {
LOG.warn("Error processing input file: URL not found for " + serviceTagsFileName + " or no privilege for reading file " + serviceTagsFileName);
}
}

if (serviceTagsFileStream != null) {
try {
serviceTagsFileStream.close();
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(serviceTagsFileName)) {
serviceTagsFileURL = getTagFileURL(serviceTagsFileName);
}

isInitial = true;
} else {
LOG.error("FATAL: Cannot find service/serviceDef/serviceTagsFile to use for retrieving tags. Will NOT be able to retrieve tags.");
}
Expand All @@ -123,44 +94,140 @@ public void init(Map<String, String> options) {

@Override
public ServiceTags retrieveTags(long lastKnownVersion, long lastActivationTimeInMillis) throws Exception {

if (LOG.isDebugEnabled()) {
LOG.debug("==> retrieveTags(lastKnownVersion=" + lastKnownVersion + ", lastActivationTimeInMillis=" + lastActivationTimeInMillis + ", serviceTagsFilePath=" + serviceTagsFileName);
}

ServiceTags serviceTags = null;
ServiceTags serviceTags = readFromFile();

if (LOG.isDebugEnabled()) {
LOG.debug("<== retrieveTags(lastKnownVersion=" + lastKnownVersion + ", lastActivationTimeInMillis=" + lastActivationTimeInMillis);
}

return serviceTags;
}

URL getTagFileURL(String fileName) {
URL fileURL = null;

InputStream tagFileStream = null;

File f = new File(fileName);

if (f.exists() && f.isFile() && f.canRead()) {
try {
tagFileStream = new FileInputStream(f);
fileURL = f.toURI().toURL();
} catch (FileNotFoundException exception) {
LOG.error("Error processing input file:" + fileName + " or no privilege for reading file " + fileName, exception);
} catch (MalformedURLException malformedException) {
LOG.error("Error processing input file:" + fileName + " cannot be converted to URL " + fileName, malformedException);
}
} else {

fileURL = getClass().getResource(fileName);
if (fileURL == null) {
if (!fileName.startsWith("/")) {
fileURL = getClass().getResource("/" + fileName);
}
}

if (fileURL == null) {
fileURL = ClassLoader.getSystemClassLoader().getResource(fileName);
if (fileURL == null) {
if (!fileName.startsWith("/")) {
fileURL = ClassLoader.getSystemClassLoader().getResource("/" + fileName);
}
}
}

if (fileURL != null) {

try {
tagFileStream = fileURL.openStream();
} catch (Exception exception) {
fileURL = null;
LOG.error(fileName + " is not a file", exception);
}
} else {
LOG.warn("Error processing input file: URL not found for " + fileName + " or no privilege for reading file " + fileName);
}
}

if (tagFileStream != null) {
try {
tagFileStream.close();
} catch (Exception e) {
// Ignore
}
}
return fileURL;
}

private ServiceTags readFromFile() {

if (serviceTagsFileURL != null) {
try (
InputStream serviceTagsFileStream = serviceTagsFileURL.openStream();
Reader reader = new InputStreamReader(serviceTagsFileStream, Charset.forName("UTF-8"))
) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerFileBasedTagRetriever.readFromFile: sourceFileName=" + serviceTagsFileName);
}

serviceTags = gsonBuilder.fromJson(reader, ServiceTags.class);
ServiceTags ret = null;

if (serviceTags.getTagVersion() <= lastKnownVersion) {
// No change in serviceTags
serviceTags = null;
} else {
String fileName;

fileName = serviceTagsFileName;

if (isInitial) {
isInitial = false;
if (serviceTagsFileURL != null) {
try (
InputStream fileStream = serviceTagsFileURL.openStream();
Reader reader = new InputStreamReader(fileStream, StandardCharsets.UTF_8)
) {

ret = gsonBuilder.fromJson(reader, ServiceTags.class);
if (deDupTags) {
final int countOfDuplicateTags = serviceTags.dedupTags();
LOG.info("Number of duplicate tags removed from the received serviceTags:[" + countOfDuplicateTags + "]. Number of tags in the de-duplicated serviceTags :[" + serviceTags.getTags().size() + "].");
final int countOfDuplicateTags = ret.dedupTags();
LOG.info("Number of duplicate tags removed from the received serviceTags:[" + countOfDuplicateTags + "]. Number of tags in the de-duplicated serviceTags :[" + ret.getTags().size() + "].");
}

} catch (IOException e) {
LOG.warn("Error processing input file: or no privilege for reading file " + fileName, e);
}
} catch (IOException e) {
LOG.warn("Error processing input file: or no privilege for reading file " + serviceTagsFileName);
throw e;
} else {
LOG.error("Error reading file: " + fileName);
}
} else {
LOG.error("Error reading file: " + serviceTagsFileName);
throw new Exception("serviceTagsFileURL is null!");

} else if (tagFilesCount > 0) {

currentTagFileIndex = currentTagFileIndex % tagFilesCount;
fileName = serviceTagsFileName + "_" + currentTagFileIndex + ".json";
URL fileURL = getTagFileURL(fileName);
if (fileURL != null) {
try (
InputStream fileStream = fileURL.openStream();
Reader reader = new InputStreamReader(fileStream, StandardCharsets.UTF_8)
) {

ret = gsonBuilder.fromJson(reader, ServiceTags.class);
currentTagFileIndex++;
if (deDupTags) {
final int countOfDuplicateTags = ret.dedupTags();
LOG.info("Number of duplicate tags removed from the received serviceTags:[" + countOfDuplicateTags + "]. Number of tags in the de-duplicated serviceTags :[" + ret.getTags().size() + "].");
}
} catch (IOException e) {
LOG.warn("Error processing input file: or no privilege for reading file " + fileName, e);
}
} else {
LOG.error("Error reading file: " + fileName);
}

}

if (LOG.isDebugEnabled()) {
LOG.debug("<== retrieveTags(lastKnownVersion=" + lastKnownVersion + ", lastActivationTimeInMillis=" + lastActivationTimeInMillis);
LOG.debug("<== RangerFileBasedTagRetriever.readFromFile: sourceFileName=" + fileName);
}

return serviceTags;
return ret;
}

}
Expand Down

0 comments on commit 4ecb2f8

Please sign in to comment.