Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NUTCH-3026 -- add statusOnly as an indexing option #799

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 136 additions & 4 deletions src/java/org/apache/nutch/indexer/IndexerMapReduce.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.nutch.parse.ParseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.codec.binary.Base64;
Expand Down Expand Up @@ -64,7 +67,7 @@
* </p>
* <p>
* See
* {@link #initMRJob(Path, Path, Collection, Job, boolean)}
* {@link #initMRJob(Path, Path, Collection, Job, boolean, boolean)}
* for details on the specific data structures and parameters required for
* indexing.
* </p>
Expand All @@ -78,6 +81,8 @@ public class IndexerMapReduce extends Configured {
public static final String INDEXER_PARAMS = "indexer.additional.params";
public static final String INDEXER_DELETE = "indexer.delete";
public static final String INDEXER_NO_COMMIT = "indexer.nocommit";

public static final String INDEXER_STATUS_ONLY = "indexer.statusonly";
public static final String INDEXER_DELETE_ROBOTS_NOINDEX = "indexer.delete.robots.noindex";
public static final String INDEXER_DELETE_SKIPPED = "indexer.delete.skipped.by.indexingfilter";
public static final String INDEXER_SKIP_NOTMODIFIED = "indexer.skip.notmodified";
Expand Down Expand Up @@ -203,6 +208,7 @@ public static class IndexerReducer extends
private boolean deleteRobotsNoIndex = false;
private boolean deleteSkippedByIndexingFilter = false;
private boolean base64 = false;
private boolean statusOnly = false;
private IndexingFilters filters;
private ScoringFilters scfilters;

Expand All @@ -226,6 +232,7 @@ public void setup(Reducer<Text, NutchWritable, Text, NutchIndexAction>.Context c
false);
skip = conf.getBoolean(INDEXER_SKIP_NOTMODIFIED, false);
base64 = conf.getBoolean(INDEXER_BINARY_AS_BASE64, false);
statusOnly = conf.getBoolean(INDEXER_STATUS_ONLY, false);

normalize = conf.getBoolean(URL_NORMALIZING, false);
filter = conf.getBoolean(URL_FILTERING, false);
Expand All @@ -251,7 +258,8 @@ public void reduce(Text key, Iterable<NutchWritable> values,
ParseText parseText = null;

for (NutchWritable val : values) {
final Writable value = val.get(); // unwrap
final Writable value = val.get();// unwrap

if (value instanceof Inlinks) {
inlinks = (Inlinks) value;
} else if (value instanceof CrawlDatum) {
Expand Down Expand Up @@ -295,6 +303,10 @@ public void reduce(Text key, Iterable<NutchWritable> values,
LOG.warn("Unrecognized type: {}", value.getClass());
}
}
if (statusOnly) {
reduceStatusOnly(key, context, inlinks, dbDatum, fetchDatum, content, parseData, parseText);
return;
}

// Whether to delete GONE or REDIRECTS
if (delete && fetchDatum != null) {
Expand Down Expand Up @@ -429,10 +441,130 @@ public void reduce(Text key, Iterable<NutchWritable> values,
NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
context.write(key, action);
}

private void reduceStatusOnly(Text key, Reducer.Context context, Inlinks inlinks, CrawlDatum dbDatum,
CrawlDatum fetchDatum, Content content,
ParseData parseData, ParseText parseText)
throws IOException, InterruptedException {

NutchDocument doc = new NutchDocument();
doc.add("id", key.toString());
updateDbDatum(dbDatum, doc);
updateFetchDatum(fetchDatum, doc);
updateContent(key, content, doc);
doc = updateParse(key, parseData, parseText, fetchDatum, inlinks, doc);
LOG.info("processing status for: {}", key.toString());

context.getCounter("IndexerStatus", "indexed (add/update)").increment(1);
NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
context.write(key, action);
}

private NutchDocument updateParse(Text key, ParseData parseData, ParseText parseText,
CrawlDatum fetchDatum, Inlinks inlinks, NutchDocument doc) {
if (parseData == null) {
return doc;
}
ParseStatus status = parseData.getStatus();
String parseStatusMajorName = "UNKNOWN!";
if (status.getMajorCode() >= 0 &&
status.getMajorCode() < ParseStatus.majorCodes.length) {
parseStatusMajorName = ParseStatus.majorCodes[status.getMajorCode()];
}
if (status.getMessage() != null && status.getMessage().trim().length() > 0) {
doc.add("nutch.parse.message", status.getMessage());
}
doc.add("nutch.parse.status", parseStatusMajorName);
//TODO: add minor status

// add segment, used to map from merged index back to segment files
doc.add("nutch.segment", parseData.getContentMeta().get(Nutch.SEGMENT_NAME_KEY));

// add digest, used by dedup
doc.add("nutch.digest", parseData.getContentMeta().get(Nutch.SIGNATURE_KEY));

if (parseData != null && parseText != null) {
final Parse parse = new ParseImpl(parseText, parseData);
try {
doc = filters.filter(doc, parse, key, fetchDatum, inlinks);
//remove the content
doc.removeField("content");
return doc;
} catch (IndexingException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Error filtering " + key + ": ", e);
}
//context.getCounter("IndexerStatus", "errors (IndexingFilter)").increment(1);
}
}
return doc;
}

private int getStatus(Metadata metadata) {
if (metadata.get("_response.headers_") == null) {
return -1;
}
String[] headers = metadata.get("_response.headers_").split("\r\n");
String firstHeader = headers[0];

Matcher m = Pattern.compile("\\A(\\d\\d\\d)\\Z").matcher(firstHeader.trim());
if (m.find()) {
return Integer.parseInt(m.group(1));
}
return -1;
}

private void updateContent(Text key, Content content, NutchDocument doc) {
if (content == null) {
return;
}
String contentType = content.getContentType();
doc.add("nutch.content.content-type", contentType);
if (content.getContent() != null) {
doc.add("nutch.content.length", Long.toString(content.getContent().length));
}
Metadata nutchMetadata = content.getMetadata();
int statusCode = getStatus(nutchMetadata);
if (statusCode > -1) {
doc.add("nutch.http.status", statusCode);
}
/*
//TODO -- do we want to add all of this?
//Dangerous to pass through http headers without prefixing them
//or sanitizing them?
if (nutchMetadata != null) {

for (String n : nutchMetadata.names()) {
String[] vals = nutchMetadata.getValues(n);
if (vals.length == 1) {
doc.add(n, vals[0]);
} else {
doc.add(n, nutchMetadata.getValues(n));
}
}
}*/
}

private void updateFetchDatum(CrawlDatum fetchDatum, NutchDocument doc) {
if (fetchDatum == null) {
return;
}
String fetchStatus = fetchDatum.getStatusName(fetchDatum.getStatus());
doc.add("nutch.fetch.status", fetchStatus);
}

private void updateDbDatum(CrawlDatum dbDatum, NutchDocument doc) {
if (dbDatum == null) {
return;
}
String dbStatus = dbDatum.getStatusName(dbDatum.getStatus());
doc.add("nutch.db.status", dbStatus);
}

}

public static void initMRJob(Path crawlDb, Path linkDb,
Collection<Path> segments, Job job, boolean addBinaryContent) throws IOException{
Collection<Path> segments, Job job, boolean addBinaryContent, boolean statusOnly) throws IOException{

Configuration conf = job.getConfiguration();

Expand Down Expand Up @@ -463,7 +595,7 @@ public static void initMRJob(Path crawlDb, Path linkDb,
FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));

if (addBinaryContent) {
if (addBinaryContent || statusOnly) {
FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
}
}
Expand Down
18 changes: 14 additions & 4 deletions src/java/org/apache/nutch/indexer/IndexingJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
boolean filter, boolean normalize, boolean addBinaryContent)
throws IOException, InterruptedException, ClassNotFoundException {
index(crawlDb, linkDb, segments, noCommit, deleteGone, params, false,
false, false, false);
false, false, false, false);
}

public void index(Path crawlDb, Path linkDb, List<Path> segments,
boolean noCommit, boolean deleteGone, String params,
boolean filter, boolean normalize, boolean addBinaryContent,
boolean base64) throws IOException, InterruptedException, ClassNotFoundException {
boolean base64, boolean statusOnly) throws IOException, InterruptedException,
ClassNotFoundException {

StopWatch stopWatch = new StopWatch();
stopWatch.start();
Expand All @@ -116,6 +117,7 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
LOG.info("Indexer: deleting gone documents: {}", deleteGone);
LOG.info("Indexer: URL filtering: {}", filter);
LOG.info("Indexer: URL normalizing: {}", normalize);
LOG.info("Indexer: status only: {}", statusOnly);
if (addBinaryContent) {
if (base64) {
LOG.info("Indexer: adding binary content as Base64");
Expand All @@ -124,13 +126,15 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
}
}

IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job, addBinaryContent);
IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job,
addBinaryContent, statusOnly);

conf.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
conf.setBoolean(IndexerMapReduce.URL_FILTERING, filter);
conf.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize);
conf.setBoolean(IndexerMapReduce.INDEXER_BINARY_AS_BASE64, base64);
conf.setBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, noCommit);
conf.setBoolean(IndexerMapReduce.INDEXER_STATUS_ONLY, statusOnly);

if (params != null) {
conf.set(IndexerMapReduce.INDEXER_PARAMS, params);
Expand Down Expand Up @@ -209,6 +213,8 @@ private static void usage() {
System.err.println(
"\t-addBinaryContent\tindex raw/binary content in field `binaryContent`");
System.err.println("\t-base64 \tuse Base64 encoding for binary content");
System.err.println("\t-statusOnly \tindex the status of all urls in the crawldb and skip " +
"the content");
System.err.println("");
}

Expand All @@ -233,6 +239,7 @@ public int run(String[] args) throws Exception {
boolean normalize = false;
boolean addBinaryContent = false;
boolean base64 = false;
boolean statusOnly = false;

for (int i = 0; i < args.length; i++) {
FileSystem fs = null;
Expand Down Expand Up @@ -272,6 +279,8 @@ public int run(String[] args) throws Exception {
* given
*/
crawlDb = new Path(args[i]);
} else if (args[i].equals("-statusOnly")) {
statusOnly = true;
} else {
// remaining arguments are segments
dir = new Path(args[i]);
Expand All @@ -289,7 +298,8 @@ public int run(String[] args) throws Exception {
}

try {
index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter, normalize, addBinaryContent, base64);
index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter, normalize,
addBinaryContent, base64, statusOnly);
return 0;
} catch (final Exception e) {
LOG.error("Indexer: {}", StringUtils.stringifyException(e));
Expand Down
Loading