diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java index 1b8ff52eb..075c5965a 100644 --- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java +++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java @@ -20,7 +20,10 @@ import java.lang.invoke.MethodHandles; import java.util.Collection; import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.nutch.parse.ParseStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.codec.binary.Base64; @@ -64,7 +67,7 @@ *

*

* See - * {@link #initMRJob(Path, Path, Collection, Job, boolean)} + * {@link #initMRJob(Path, Path, Collection, Job, boolean, boolean)} * for details on the specific data structures and parameters required for * indexing. *

@@ -78,6 +81,8 @@ public class IndexerMapReduce extends Configured { public static final String INDEXER_PARAMS = "indexer.additional.params"; public static final String INDEXER_DELETE = "indexer.delete"; public static final String INDEXER_NO_COMMIT = "indexer.nocommit"; + + public static final String INDEXER_STATUS_ONLY = "indexer.statusonly"; public static final String INDEXER_DELETE_ROBOTS_NOINDEX = "indexer.delete.robots.noindex"; public static final String INDEXER_DELETE_SKIPPED = "indexer.delete.skipped.by.indexingfilter"; public static final String INDEXER_SKIP_NOTMODIFIED = "indexer.skip.notmodified"; @@ -203,6 +208,7 @@ public static class IndexerReducer extends private boolean deleteRobotsNoIndex = false; private boolean deleteSkippedByIndexingFilter = false; private boolean base64 = false; + private boolean statusOnly = false; private IndexingFilters filters; private ScoringFilters scfilters; @@ -226,6 +232,7 @@ public void setup(Reducer.Context c false); skip = conf.getBoolean(INDEXER_SKIP_NOTMODIFIED, false); base64 = conf.getBoolean(INDEXER_BINARY_AS_BASE64, false); + statusOnly = conf.getBoolean(INDEXER_STATUS_ONLY, false); normalize = conf.getBoolean(URL_NORMALIZING, false); filter = conf.getBoolean(URL_FILTERING, false); @@ -251,7 +258,8 @@ public void reduce(Text key, Iterable values, ParseText parseText = null; for (NutchWritable val : values) { - final Writable value = val.get(); // unwrap + final Writable value = val.get();// unwrap + if (value instanceof Inlinks) { inlinks = (Inlinks) value; } else if (value instanceof CrawlDatum) { @@ -295,6 +303,10 @@ public void reduce(Text key, Iterable values, LOG.warn("Unrecognized type: {}", value.getClass()); } } + if (statusOnly) { + reduceStatusOnly(key, context, inlinks, dbDatum, fetchDatum, content, parseData, parseText); + return; + } // Whether to delete GONE or REDIRECTS if (delete && fetchDatum != null) { @@ -429,10 +441,130 @@ public void reduce(Text key, Iterable values, NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD); context.write(key, action); } + + private void reduceStatusOnly(Text key, Reducer.Context context, Inlinks inlinks, CrawlDatum dbDatum, + CrawlDatum fetchDatum, Content content, + ParseData parseData, ParseText parseText) + throws IOException, InterruptedException { + + NutchDocument doc = new NutchDocument(); + doc.add("id", key.toString()); + updateDbDatum(dbDatum, doc); + updateFetchDatum(fetchDatum, doc); + updateContent(key, content, doc); + doc = updateParse(key, parseData, parseText, fetchDatum, inlinks, doc); + LOG.info("processing status for: {}", key.toString()); + + context.getCounter("IndexerStatus", "indexed (add/update)").increment(1); + NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD); + context.write(key, action); + } + + private NutchDocument updateParse(Text key, ParseData parseData, ParseText parseText, + CrawlDatum fetchDatum, Inlinks inlinks, NutchDocument doc) { + if (parseData == null) { + return doc; + } + ParseStatus status = parseData.getStatus(); + String parseStatusMajorName = "UNKNOWN!"; + if (status.getMajorCode() >= 0 && + status.getMajorCode() < ParseStatus.majorCodes.length) { + parseStatusMajorName = ParseStatus.majorCodes[status.getMajorCode()]; + } + if (status.getMessage() != null && status.getMessage().trim().length() > 0) { + doc.add("nutch.parse.message", status.getMessage()); + } + doc.add("nutch.parse.status", parseStatusMajorName); + //TODO: add minor status + + // add segment, used to map from merged index back to segment files + doc.add("nutch.segment", parseData.getContentMeta().get(Nutch.SEGMENT_NAME_KEY)); + + // add digest, used by dedup + doc.add("nutch.digest", parseData.getContentMeta().get(Nutch.SIGNATURE_KEY)); + + if (parseData != null && parseText != null) { + final Parse parse = new ParseImpl(parseText, parseData); + try { + doc = filters.filter(doc, parse, key, fetchDatum, inlinks); + //remove the content + doc.removeField("content"); + return doc; + } catch (IndexingException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Error filtering " + key + ": ", e); + } + //context.getCounter("IndexerStatus", "errors (IndexingFilter)").increment(1); + } + } + return doc; + } + + private int getStatus(Metadata metadata) { + if (metadata.get("_response.headers_") == null) { + return -1; + } + String[] headers = metadata.get("_response.headers_").split("\r\n"); + String firstHeader = headers[0]; + + Matcher m = Pattern.compile("\\A(\\d\\d\\d)\\Z").matcher(firstHeader.trim()); + if (m.find()) { + return Integer.parseInt(m.group(1)); + } + return -1; + } + + private void updateContent(Text key, Content content, NutchDocument doc) { + if (content == null) { + return; + } + String contentType = content.getContentType(); + doc.add("nutch.content.content-type", contentType); + if (content.getContent() != null) { + doc.add("nutch.content.length", Long.toString(content.getContent().length)); + } + Metadata nutchMetadata = content.getMetadata(); + int statusCode = getStatus(nutchMetadata); + if (statusCode > -1) { + doc.add("nutch.http.status", statusCode); + } + /* + //TODO -- do we want to add all of this? + //Dangerous to pass through http headers without prefixing them + //or sanitizing them? + if (nutchMetadata != null) { + + for (String n : nutchMetadata.names()) { + String[] vals = nutchMetadata.getValues(n); + if (vals.length == 1) { + doc.add(n, vals[0]); + } else { + doc.add(n, nutchMetadata.getValues(n)); + } + } + }*/ + } + + private void updateFetchDatum(CrawlDatum fetchDatum, NutchDocument doc) { + if (fetchDatum == null) { + return; + } + String fetchStatus = fetchDatum.getStatusName(fetchDatum.getStatus()); + doc.add("nutch.fetch.status", fetchStatus); + } + + private void updateDbDatum(CrawlDatum dbDatum, NutchDocument doc) { + if (dbDatum == null) { + return; + } + String dbStatus = dbDatum.getStatusName(dbDatum.getStatus()); + doc.add("nutch.db.status", dbStatus); + } + } public static void initMRJob(Path crawlDb, Path linkDb, - Collection segments, Job job, boolean addBinaryContent) throws IOException{ + Collection segments, Job job, boolean addBinaryContent, boolean statusOnly) throws IOException{ Configuration conf = job.getConfiguration(); @@ -463,7 +595,7 @@ public static void initMRJob(Path crawlDb, Path linkDb, FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME)); FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME)); - if (addBinaryContent) { + if (addBinaryContent || statusOnly) { FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); } } diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java b/src/java/org/apache/nutch/indexer/IndexingJob.java index c3ddb4ae9..786f7f406 100644 --- a/src/java/org/apache/nutch/indexer/IndexingJob.java +++ b/src/java/org/apache/nutch/indexer/IndexingJob.java @@ -96,13 +96,14 @@ public void index(Path crawlDb, Path linkDb, List segments, boolean filter, boolean normalize, boolean addBinaryContent) throws IOException, InterruptedException, ClassNotFoundException { index(crawlDb, linkDb, segments, noCommit, deleteGone, params, false, - false, false, false); + false, false, false, false); } public void index(Path crawlDb, Path linkDb, List segments, boolean noCommit, boolean deleteGone, String params, boolean filter, boolean normalize, boolean addBinaryContent, - boolean base64) throws IOException, InterruptedException, ClassNotFoundException { + boolean base64, boolean statusOnly) throws IOException, InterruptedException, + ClassNotFoundException { StopWatch stopWatch = new StopWatch(); stopWatch.start(); @@ -116,6 +117,7 @@ public void index(Path crawlDb, Path linkDb, List segments, LOG.info("Indexer: deleting gone documents: {}", deleteGone); LOG.info("Indexer: URL filtering: {}", filter); LOG.info("Indexer: URL normalizing: {}", normalize); + LOG.info("Indexer: status only: {}", statusOnly); if (addBinaryContent) { if (base64) { LOG.info("Indexer: adding binary content as Base64"); @@ -124,13 +126,15 @@ public void index(Path crawlDb, Path linkDb, List segments, } } - IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job, addBinaryContent); + IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job, + addBinaryContent, statusOnly); conf.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone); conf.setBoolean(IndexerMapReduce.URL_FILTERING, filter); conf.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize); conf.setBoolean(IndexerMapReduce.INDEXER_BINARY_AS_BASE64, base64); conf.setBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, noCommit); + conf.setBoolean(IndexerMapReduce.INDEXER_STATUS_ONLY, statusOnly); if (params != null) { conf.set(IndexerMapReduce.INDEXER_PARAMS, params); @@ -209,6 +213,8 @@ private static void usage() { System.err.println( "\t-addBinaryContent\tindex raw/binary content in field `binaryContent`"); System.err.println("\t-base64 \tuse Base64 encoding for binary content"); + System.err.println("\t-statusOnly \tindex the status of all urls in the crawldb and skip " + + "the content"); System.err.println(""); } @@ -233,6 +239,7 @@ public int run(String[] args) throws Exception { boolean normalize = false; boolean addBinaryContent = false; boolean base64 = false; + boolean statusOnly = false; for (int i = 0; i < args.length; i++) { FileSystem fs = null; @@ -272,6 +279,8 @@ public int run(String[] args) throws Exception { * given */ crawlDb = new Path(args[i]); + } else if (args[i].equals("-statusOnly")) { + statusOnly = true; } else { // remaining arguments are segments dir = new Path(args[i]); @@ -289,7 +298,8 @@ public int run(String[] args) throws Exception { } try { - index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter, normalize, addBinaryContent, base64); + index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter, normalize, + addBinaryContent, base64, statusOnly); return 0; } catch (final Exception e) { LOG.error("Indexer: {}", StringUtils.stringifyException(e));