From 3a294709d7fc5e8324cca6ebd3de27164c154c23 Mon Sep 17 00:00:00 2001
From: tballison
Date: Fri, 17 Nov 2023 14:48:24 -0500
Subject: [PATCH] NUTCH-3026 -- first steps towards statusOnly option in
IndexingJob
---
.../nutch/indexer/IndexerMapReduce.java | 140 +++++++++++++++++-
.../org/apache/nutch/indexer/IndexingJob.java | 18 ++-
2 files changed, 150 insertions(+), 8 deletions(-)
diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
index 1b8ff52eb0..075c5965aa 100644
--- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
+++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
@@ -20,7 +20,10 @@
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.nutch.parse.ParseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.codec.binary.Base64;
@@ -64,7 +67,7 @@
*
*
* See
- * {@link #initMRJob(Path, Path, Collection, Job, boolean)}
+ * {@link #initMRJob(Path, Path, Collection, Job, boolean, boolean)}
* for details on the specific data structures and parameters required for
* indexing.
*
@@ -78,6 +81,8 @@ public class IndexerMapReduce extends Configured {
public static final String INDEXER_PARAMS = "indexer.additional.params";
public static final String INDEXER_DELETE = "indexer.delete";
public static final String INDEXER_NO_COMMIT = "indexer.nocommit";
+
+ public static final String INDEXER_STATUS_ONLY = "indexer.statusonly";
public static final String INDEXER_DELETE_ROBOTS_NOINDEX = "indexer.delete.robots.noindex";
public static final String INDEXER_DELETE_SKIPPED = "indexer.delete.skipped.by.indexingfilter";
public static final String INDEXER_SKIP_NOTMODIFIED = "indexer.skip.notmodified";
@@ -203,6 +208,7 @@ public static class IndexerReducer extends
private boolean deleteRobotsNoIndex = false;
private boolean deleteSkippedByIndexingFilter = false;
private boolean base64 = false;
+ private boolean statusOnly = false;
private IndexingFilters filters;
private ScoringFilters scfilters;
@@ -226,6 +232,7 @@ public void setup(Reducer.Context c
false);
skip = conf.getBoolean(INDEXER_SKIP_NOTMODIFIED, false);
base64 = conf.getBoolean(INDEXER_BINARY_AS_BASE64, false);
+ statusOnly = conf.getBoolean(INDEXER_STATUS_ONLY, false);
normalize = conf.getBoolean(URL_NORMALIZING, false);
filter = conf.getBoolean(URL_FILTERING, false);
@@ -251,7 +258,8 @@ public void reduce(Text key, Iterable values,
ParseText parseText = null;
for (NutchWritable val : values) {
- final Writable value = val.get(); // unwrap
+ final Writable value = val.get();// unwrap
+
if (value instanceof Inlinks) {
inlinks = (Inlinks) value;
} else if (value instanceof CrawlDatum) {
@@ -295,6 +303,10 @@ public void reduce(Text key, Iterable values,
LOG.warn("Unrecognized type: {}", value.getClass());
}
}
+ if (statusOnly) {
+ reduceStatusOnly(key, context, inlinks, dbDatum, fetchDatum, content, parseData, parseText);
+ return;
+ }
// Whether to delete GONE or REDIRECTS
if (delete && fetchDatum != null) {
@@ -429,10 +441,130 @@ public void reduce(Text key, Iterable values,
NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
context.write(key, action);
}
+
+ private void reduceStatusOnly(Text key, Reducer.Context context, Inlinks inlinks, CrawlDatum dbDatum,
+ CrawlDatum fetchDatum, Content content,
+ ParseData parseData, ParseText parseText)
+ throws IOException, InterruptedException {
+
+ NutchDocument doc = new NutchDocument();
+ doc.add("id", key.toString());
+ updateDbDatum(dbDatum, doc);
+ updateFetchDatum(fetchDatum, doc);
+ updateContent(key, content, doc);
+ doc = updateParse(key, parseData, parseText, fetchDatum, inlinks, doc);
+ LOG.info("processing status for: {}", key.toString());
+
+ context.getCounter("IndexerStatus", "indexed (add/update)").increment(1);
+ NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
+ context.write(key, action);
+ }
+
+ private NutchDocument updateParse(Text key, ParseData parseData, ParseText parseText,
+ CrawlDatum fetchDatum, Inlinks inlinks, NutchDocument doc) {
+ if (parseData == null) {
+ return doc;
+ }
+ ParseStatus status = parseData.getStatus();
+ String parseStatusMajorName = "UNKNOWN!";
+ if (status.getMajorCode() >= 0 &&
+ status.getMajorCode() < ParseStatus.majorCodes.length) {
+ parseStatusMajorName = ParseStatus.majorCodes[status.getMajorCode()];
+ }
+ if (status.getMessage() != null && status.getMessage().trim().length() > 0) {
+ doc.add("nutch.parse.message", status.getMessage());
+ }
+ doc.add("nutch.parse.status", parseStatusMajorName);
+ //TODO: add minor status
+
+ // add segment, used to map from merged index back to segment files
+ doc.add("nutch.segment", parseData.getContentMeta().get(Nutch.SEGMENT_NAME_KEY));
+
+ // add digest, used by dedup
+ doc.add("nutch.digest", parseData.getContentMeta().get(Nutch.SIGNATURE_KEY));
+
+ if (parseData != null && parseText != null) {
+ final Parse parse = new ParseImpl(parseText, parseData);
+ try {
+ doc = filters.filter(doc, parse, key, fetchDatum, inlinks);
+ //remove the content
+ doc.removeField("content");
+ return doc;
+ } catch (IndexingException e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Error filtering " + key + ": ", e);
+ }
+ //context.getCounter("IndexerStatus", "errors (IndexingFilter)").increment(1);
+ }
+ }
+ return doc;
+ }
+
+ private int getStatus(Metadata metadata) {
+ if (metadata.get("_response.headers_") == null) {
+ return -1;
+ }
+ String[] headers = metadata.get("_response.headers_").split("\r\n");
+ String firstHeader = headers[0];
+
+ Matcher m = Pattern.compile("\\A(\\d\\d\\d)\\Z").matcher(firstHeader.trim());
+ if (m.find()) {
+ return Integer.parseInt(m.group(1));
+ }
+ return -1;
+ }
+
+ private void updateContent(Text key, Content content, NutchDocument doc) {
+ if (content == null) {
+ return;
+ }
+ String contentType = content.getContentType();
+ doc.add("nutch.content.content-type", contentType);
+ if (content.getContent() != null) {
+ doc.add("nutch.content.length", Long.toString(content.getContent().length));
+ }
+ Metadata nutchMetadata = content.getMetadata();
+ int statusCode = getStatus(nutchMetadata);
+ if (statusCode > -1) {
+ doc.add("nutch.http.status", statusCode);
+ }
+ /*
+ //TODO -- do we want to add all of this?
+ //Dangerous to pass through http headers without prefixing them
+ //or sanitizing them?
+ if (nutchMetadata != null) {
+
+ for (String n : nutchMetadata.names()) {
+ String[] vals = nutchMetadata.getValues(n);
+ if (vals.length == 1) {
+ doc.add(n, vals[0]);
+ } else {
+ doc.add(n, nutchMetadata.getValues(n));
+ }
+ }
+ }*/
+ }
+
+ private void updateFetchDatum(CrawlDatum fetchDatum, NutchDocument doc) {
+ if (fetchDatum == null) {
+ return;
+ }
+ String fetchStatus = fetchDatum.getStatusName(fetchDatum.getStatus());
+ doc.add("nutch.fetch.status", fetchStatus);
+ }
+
+ private void updateDbDatum(CrawlDatum dbDatum, NutchDocument doc) {
+ if (dbDatum == null) {
+ return;
+ }
+ String dbStatus = dbDatum.getStatusName(dbDatum.getStatus());
+ doc.add("nutch.db.status", dbStatus);
+ }
+
}
public static void initMRJob(Path crawlDb, Path linkDb,
- Collection segments, Job job, boolean addBinaryContent) throws IOException{
+ Collection segments, Job job, boolean addBinaryContent, boolean statusOnly) throws IOException{
Configuration conf = job.getConfiguration();
@@ -463,7 +595,7 @@ public static void initMRJob(Path crawlDb, Path linkDb,
FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
- if (addBinaryContent) {
+ if (addBinaryContent || statusOnly) {
FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
}
}
diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java b/src/java/org/apache/nutch/indexer/IndexingJob.java
index c3ddb4ae94..786f7f4066 100644
--- a/src/java/org/apache/nutch/indexer/IndexingJob.java
+++ b/src/java/org/apache/nutch/indexer/IndexingJob.java
@@ -96,13 +96,14 @@ public void index(Path crawlDb, Path linkDb, List segments,
boolean filter, boolean normalize, boolean addBinaryContent)
throws IOException, InterruptedException, ClassNotFoundException {
index(crawlDb, linkDb, segments, noCommit, deleteGone, params, false,
- false, false, false);
+ false, false, false, false);
}
public void index(Path crawlDb, Path linkDb, List segments,
boolean noCommit, boolean deleteGone, String params,
boolean filter, boolean normalize, boolean addBinaryContent,
- boolean base64) throws IOException, InterruptedException, ClassNotFoundException {
+ boolean base64, boolean statusOnly) throws IOException, InterruptedException,
+ ClassNotFoundException {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
@@ -116,6 +117,7 @@ public void index(Path crawlDb, Path linkDb, List segments,
LOG.info("Indexer: deleting gone documents: {}", deleteGone);
LOG.info("Indexer: URL filtering: {}", filter);
LOG.info("Indexer: URL normalizing: {}", normalize);
+ LOG.info("Indexer: status only: {}", statusOnly);
if (addBinaryContent) {
if (base64) {
LOG.info("Indexer: adding binary content as Base64");
@@ -124,13 +126,15 @@ public void index(Path crawlDb, Path linkDb, List segments,
}
}
- IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job, addBinaryContent);
+ IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job,
+ addBinaryContent, statusOnly);
conf.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
conf.setBoolean(IndexerMapReduce.URL_FILTERING, filter);
conf.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize);
conf.setBoolean(IndexerMapReduce.INDEXER_BINARY_AS_BASE64, base64);
conf.setBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, noCommit);
+ conf.setBoolean(IndexerMapReduce.INDEXER_STATUS_ONLY, statusOnly);
if (params != null) {
conf.set(IndexerMapReduce.INDEXER_PARAMS, params);
@@ -209,6 +213,8 @@ private static void usage() {
System.err.println(
"\t-addBinaryContent\tindex raw/binary content in field `binaryContent`");
System.err.println("\t-base64 \tuse Base64 encoding for binary content");
+ System.err.println("\t-statusOnly \tindex the status of all urls in the crawldb and skip " +
+ "the content");
System.err.println("");
}
@@ -233,6 +239,7 @@ public int run(String[] args) throws Exception {
boolean normalize = false;
boolean addBinaryContent = false;
boolean base64 = false;
+ boolean statusOnly = false;
for (int i = 0; i < args.length; i++) {
FileSystem fs = null;
@@ -272,6 +279,8 @@ public int run(String[] args) throws Exception {
* given
*/
crawlDb = new Path(args[i]);
+ } else if (args[i].equals("-statusOnly")) {
+ statusOnly = true;
} else {
// remaining arguments are segments
dir = new Path(args[i]);
@@ -289,7 +298,8 @@ public int run(String[] args) throws Exception {
}
try {
- index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter, normalize, addBinaryContent, base64);
+ index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter, normalize,
+ addBinaryContent, base64, statusOnly);
return 0;
} catch (final Exception e) {
LOG.error("Indexer: {}", StringUtils.stringifyException(e));