diff --git a/cdx_writer.py b/cdx_writer.py index a37bb34..da74df8 100755 --- a/cdx_writer.py +++ b/cdx_writer.py @@ -11,8 +11,6 @@ The functions that start with "get_" (as opposed to "parse_") are called by the dispatch loop in make_cdx using getattr(). """ -from warctools import ArchiveRecord #from https://bitbucket.org/rajbot/warc-tools -from surt import surt #from https://github.com/rajbot/surt import os import re @@ -21,19 +19,32 @@ import chardet import hashlib import json -import urllib import urlparse -from datetime import datetime -from optparse import OptionParser +from datetime import datetime +from optparse import OptionParser + + +from warctools import ArchiveRecord +# from https://bitbucket.org/rajbot/warc-tools +from surt import surt # from https://github.com/rajbot/surt class ParseError(Exception): pass + class CDX_Writer(object): # init() - #___________________________________________________________________________ - def __init__(self, file, out_file=sys.stdout, format="N b a m s k r M S V g", use_full_path=False, file_prefix=None, all_records=False, screenshot_mode=False, exclude_list=None, stats_file=None): + def __init__(self, + file, + out_file=sys.stdout, + format="N b a m s k r M S V g", + use_full_path=False, + file_prefix=None, + all_records=False, + screenshot_mode=False, + exclude_list=None, + stats_file=None): self.field_map = {'M': 'AIF meta tags', 'N': 'massaged url', @@ -46,30 +57,32 @@ def __init__(self, file, out_file=sys.stdout, format="N b a m s k r M S V g", us 'm': 'mime type', 'r': 'redirect', 's': 'response code', - } + } - self.file = file + self.file = file self.out_file = out_file self.format = format - self.all_records = all_records + self.all_records = all_records self.screenshot_mode = screenshot_mode self.crlf_pattern = re.compile('\r?\n\r?\n') - self.response_pattern = re.compile('^application/http;\s*msgtype=response$', re.I) - - #similar to what what the wayback uses: - self.fake_build_version = "archive-commons.0.0.1-SNAPSHOT-20120112102659-python" - - #these fields are set for each record in the warc - self.offset = 0 - self.surt = None - self.mime_type = None - self.headers = None - self.content = None - self.meta_tags = None + self.response_pattern = re.compile( + '^application/http;\s*msgtype=response$', re.I) + + # similar to what the wayback uses: + self.fake_build_version = ( + "archive-commons.0.0.1-SNAPSHOT-20120112102659-python") + + # these fields are set for each record in the warc + self.offset = 0 + self.surt = None + self.mime_type = None + self.headers = None + self.content = None + self.meta_tags = None self.response_code = None - #Large html files cause lxml to segfault - #problematic file was 154MB, we'll stop at 5MB + # Large html files cause lxml to segfault + # problematic file was 154MB, we'll stop at 5MB self.lxml_parse_limit = 5 * 1024 * 1024 if use_full_path: @@ -81,7 +94,7 @@ def __init__(self, file, out_file=sys.stdout, format="N b a m s k r M S V g", us if exclude_list: if not os.path.exists(exclude_list): - raise IOError, "Exclude file not found" + raise IOError("Exclude file not found") self.excludes = [] f = open(exclude_list, 'r') for line in f: @@ -94,19 +107,17 @@ def __init__(self, file, out_file=sys.stdout, format="N b a m s k r M S V g", us if stats_file: if os.path.exists(stats_file): - raise IOError, "Stats file already exists" + raise IOError("Stats file already exists") self.stats_file = stats_file else: self.stats_file = None - # parse_http_header() - #___________________________________________________________________________ def parse_http_header(self, header_name): if self.headers is None: return None - pattern = re.compile(header_name+':\s*(.+)', re.I) + pattern = re.compile(header_name + ':\s*(.+)', re.I) for line in iter(self.headers): m = pattern.match(line) if m: @@ -114,13 +125,13 @@ def parse_http_header(self, header_name): return None # parse_http_content_type_header() - #___________________________________________________________________________ def parse_http_content_type_header(self, record): content_type = self.parse_http_header('content-type') if content_type is None: return 'unk' - # some http responses end abruptly: ...Content-Length: 0\r\nConnection: close\r\nContent-Type: \r\n\r\n\r\n\r\n' + # some http responses end abruptly: ...Content-Length: 0\r\nConnection: + # close\r\nContent-Type: \r\n\r\n\r\n\r\n' content_type = content_type.strip() if '' == content_type: return 'unk' @@ -134,9 +145,7 @@ def parse_http_content_type_header(self, record): else: return 'unk' - # parse_charset() - #___________________________________________________________________________ def parse_charset(self): charset = None charset_pattern = re.compile('charset\s*=\s*([a-z0-9_\-]+)', re.I) @@ -147,7 +156,6 @@ def parse_charset(self): if m: charset = m.group(1) - if charset is None and self.meta_tags is not None: content_type = self.meta_tags.get('content-type') if content_type: @@ -161,15 +169,16 @@ def parse_charset(self): return charset # parse_meta_tags - #___________________________________________________________________________ def parse_meta_tags(self, record): """We want to parse meta tags in , even if not direct children. e.g. What should we do about multiple meta tags with the same name? - currently, we append the content attribs together with a comma seperator. + currently, we append the content attribs together with a comma + seperator. - We use either the 'name' or 'http-equiv' attrib as the meta_tag dict key. + We use either the 'name' or 'http-equiv' attrib as the meta_tag dict + key. """ if not ('response' == record.type and 'text/html' == self.mime_type): @@ -180,12 +189,12 @@ def parse_meta_tags(self, record): meta_tags = {} - #lxml.html can't parse blank documents + # lxml.html can't parse blank documents html_str = self.content.strip() if '' == html_str: return meta_tags - #lxml can't handle large documents + # lxml can't handle large documents if record.content_length > self.lxml_parse_limit: return meta_tags @@ -195,7 +204,7 @@ def parse_meta_tags(self, record): # so we're going to give up on lxml and use regexes to parse html :( meta_tags = {} - #we only want to look for meta tags that occur before the tag + # we only want to look for meta tags that occur before the tag head_limit = None m = re.search('()', html_str, re.I) if m: @@ -208,13 +217,18 @@ def parse_meta_tags(self, record): name = None content = None - m = re.search(r'''\b(?:name|http-equiv)\s*=\s*(['"]?)(.*?)(\1)[\s/>]''', x.group(1), re.I) + m = re.search( + r'''\ + \b(?:name|http-equiv)\s*=\s*(['"]?)(.*?)(\1)[\s/>]''', + x.group(1), re.I) if m: name = m.group(2).lower() else: continue - m = re.search(r'''\bcontent\s*=\s*(['"]?)(.*?)(\1)[\s/>]''', x.group(1), re.I) + m = re.search( + r'''\bcontent\s*=\s*(['"]?)(.*?)(\1)[\s/>]''', x.group(1), + re.I) if m: content = m.group(2) else: @@ -224,14 +238,12 @@ def parse_meta_tags(self, record): meta_tags[name] = content else: if 'refresh' != name: - #for redirect urls, we only want the first refresh tag + # for redirect urls, we only want the first refresh tag meta_tags[name] += ',' + content return meta_tags - # get_AIF_meta_tags() //field "M" - #___________________________________________________________________________ def get_AIF_meta_tags(self, record): """robot metatags, if present, should be in this order: A, F, I """ @@ -263,7 +275,7 @@ def get_AIF_meta_tags(self, record): try: if int(sfps[0]) > 0: s += 'P' - except ValueError as ex: + except ValueError: pass if s: @@ -271,9 +283,7 @@ def get_AIF_meta_tags(self, record): else: return '-' - # get_massaged_url() //field "N" - #___________________________________________________________________________ def get_massaged_url(self, record, use_precalculated_value=True): if use_precalculated_value: return self.surt @@ -283,16 +293,14 @@ def get_massaged_url(self, record, use_precalculated_value=True): else: url = record.url if self.screenshot_mode: - url = 'http://web.archive.org/screenshot/'+url + url = 'http://web.archive.org/screenshot/' + url try: return surt(url) except: return self.get_original_url(record) - # get_compressed_record_size() //field "S" - #___________________________________________________________________________ def get_compressed_record_size(self, record): size = record.compressed_record_size if size is None: @@ -300,15 +308,11 @@ def get_compressed_record_size(self, record): return str(size) - # get_compressed_arc_file_offset() //field "V" - #___________________________________________________________________________ def get_compressed_arc_file_offset(self, record): return str(self.offset) - # get_original_url() //field "a" - #___________________________________________________________________________ def get_original_url(self, record): if 'warcinfo' == record.type: url = 'warcinfo:/%s/%s' % (self.file, self.fake_build_version) @@ -318,7 +322,8 @@ def get_original_url(self, record): # There are few arc files from 2002 that have non-ascii characters in # the url field. These are not utf-8 characters, and the charset of the - # page might not be specified, so use chardet to try and make these usable. + # page might not be specified, so use chardet to try and make these + # usable. if isinstance(url, str): try: url.decode('ascii') @@ -326,21 +331,25 @@ def get_original_url(self, record): enc = chardet.detect(url) if enc and enc['encoding']: if 'EUC-TW' == enc['encoding']: - # We don't have the EUC-TW encoding installed, and most likely - # something is so wrong that we probably can't recover this url + # We don't have the EUC-TW encoding installed, + # and most likely + # something is so wrong that we probably can't recover + # this url url = url.decode('Big5', 'replace') else: url = url.decode(enc['encoding'], 'replace') else: url = url.decode('utf-8', 'replace') - # Some arc headers contain urls with the '\r' character, which will cause - # problems downstream when trying to process this url, so escape it. + # Some arc headers contain urls with the '\r' character, which will + # cause problems downstream when trying to process this url, so escape + # it. # While we are at it, replace other newline chars. url = url.replace('\r', '%0D') url = url.replace('\n', '%0A') - url = url.replace('\x0c', '%0C') #formfeed - url = url.replace('\x00', '%00') #null may cause problems with downstream C programs + url = url.replace('\x0c', '%0C') # formfeed + # null may cause problems with downstream C programs + url = url.replace('\x00', '%00') if self.screenshot_mode: url = u'http://web.archive.org/screenshot/' + url @@ -348,41 +357,37 @@ def get_original_url(self, record): return url # get_date() //field "b" - #___________________________________________________________________________ def get_date(self, record): - #warcs and arcs use a different date format - #consider using dateutil.parser instead + # warcs and arcs use a different date format + # consider using dateutil.parser instead if record.date.isdigit(): date_len = len(record.date) if 14 == date_len: - #arc record already has date in the format we need + # arc record already has date in the format we need return record.date elif 16 == date_len: - #some arc records have 16-digit dates: 2000082305410049 + # some arc records have 16-digit dates: 2000082305410049 return record.date[:14] elif 18 == date_len: - #some arc records have 18-digit dates: 200009180023002953 + # some arc records have 18-digit dates: 200009180023002953 return record.date[:14] elif 12 == date_len: - #some arc records have 12-digit dates: 200011201434 + # some arc records have 12-digit dates: 200011201434 return record.date + '00' elif re.match('^[a-f0-9]+$', record.date): - #some arc records have a hex string in the date field + # some arc records have a hex string in the date field return '-' - #warc record + # warc record date = datetime.strptime(record.date, "%Y-%m-%dT%H:%M:%SZ") return date.strftime("%Y%m%d%H%M%S") # get_file_name() //field "g" - #___________________________________________________________________________ def get_file_name(self, record): return self.warc_path - # is_response() - #___________________________________________________________________________ def is_response(self, content_type): if content_type is None: return False @@ -393,9 +398,7 @@ def is_response(self, content_type): return got_match - # get_new_style_checksum() //field "k" - #___________________________________________________________________________ def get_new_style_checksum(self, record): """Return a base32-encoded sha1 For revisit records, return the original sha1 @@ -407,13 +410,18 @@ def get_new_style_checksum(self, record): return '-' else: return digest.replace('sha1:', '') - elif 'response' == record.type and self.is_response(record.content_type): + elif 'response' == ( + record.type and self.is_response(record.content_type) + ): digest = record.get_header('WARC-Payload-Digest') - #Our patched warc-tools fabricates this header if it is not present in the record + # Our patched warc-tools fabricates this header if it is not + # present in the record return digest.replace('sha1:', '') elif 'response' == record.type and self.content is not None: - # This is an arc record. Our patched warctools fabricates the WARC-Payload-Digest - # header even for arc files so that we don't need to load large payloads in memory + # This is an arc record. Our patched warctools fabricates the + # WARC-Payload-Digest + # header even for arc files so that we don't need to load large + # payloads in memory digest = record.get_header('WARC-Payload-Digest') if digest is not None: return digest.replace('sha1:', '') @@ -425,9 +433,9 @@ def get_new_style_checksum(self, record): return base64.b32encode(h.digest()) # get_mime_type() //field "m" - #___________________________________________________________________________ def get_mime_type(self, record, use_precalculated_value=True): - """ See the WARC spec for more info on 'application/http; msgtype=response' + """ + See the WARC spec for more info on 'application/http; msgtype=response' http://archive-access.sourceforge.net/warc/warc_file_format-0.16.html#anchor7 """ @@ -440,14 +448,14 @@ def get_mime_type(self, record, use_precalculated_value=True): if record.content_type is None: mime_type = 'unk' else: - #alexa arc files use 'no-type' instead of 'unk' + # alexa arc files use 'no-type' instead of 'unk' mime_type = record.content_type.replace('no-type', 'unk') elif 'warcinfo' == record.type: mime_type = 'warc-info' elif self.screenshot_mode and 'metadata' == record.type: mime_type = record.content[0] else: - mime_type = 'warc/'+record.type + mime_type = 'warc/' + record.type try: mime_type = mime_type.decode('ascii') @@ -456,14 +464,12 @@ def get_mime_type(self, record, use_precalculated_value=True): return mime_type - # to_unicode() - #___________________________________________________________________________ @classmethod def to_unicode(self, s, charset): if isinstance(s, str): if charset is None: - #try utf-8 and hope for the best + # try utf-8 and hope for the best s = s.decode('utf-8', 'replace') else: try: @@ -473,7 +479,6 @@ def to_unicode(self, s, charset): return s # urljoin_and_normalize() - #___________________________________________________________________________ @classmethod def urljoin_and_normalize(self, base, url, charset): """urlparse.urljoin removes blank fragments (trailing #), @@ -508,24 +513,26 @@ def urljoin_and_normalize(self, base, url, charset): http://www.seomoz.org/trifecta/fetch/page/http://www.example.com/ """ - url = self.to_unicode(url, charset) + url = self.to_unicode(url, charset) - #the base url is from the arc/warc header, which doesn't specify a charset + # the base url is from the arc/warc header, which doesn't specify a + # charset base = self.to_unicode(base, 'utf-8') try: joined_url = urlparse.urljoin(base, url) except ValueError: - #some urls we find in arc files no longer parse with python 2.7, - #e.g. 'http://\x93\xe0\x90E\x83f\x81[\x83^\x93\xfc\x97\xcd.com/' + # some urls we find in arc files no longer parse with python 2.7, + # e.g. 'http://\x93\xe0\x90E\x83f\x81[\x83^\x93\xfc\x97\xcd.com/' return '-' # We were using os.path.normpath, but had to add too many patches - # when it was doing the wrong thing, such as turning http:// into http:/ + # when it was doing the wrong thing, such as turning http:// into + # http:/ m = re.match('(https?://.+?/)', joined_url) if m: domain = joined_url[:m.end(1)] - path = joined_url[m.end(1):] + path = joined_url[m.end(1):] if path.startswith('../'): path = path[3:] norm_url = domain + re.sub('/[^/]+/\.\./', '/', path) @@ -540,12 +547,10 @@ def urljoin_and_normalize(self, base, url, charset): elif url.endswith('#') and not norm_url.endswith('#'): norm_url += '#' - #encode spaces + # encode spaces return norm_url.replace(' ', '%20') - # get_redirect() //field "r" - #___________________________________________________________________________ def get_redirect(self, record): """Aaron, Ilya, and Kenji have proposed using '-' in the redirect column unconditionally, after a discussion on Sept 5, 2012. It turns out the @@ -556,9 +561,11 @@ def get_redirect(self, record): # response_code = self.response_code # - # ## It turns out that the refresh tag is being used in both 2xx and 3xx - # ## responses, so always check both the http location header and the meta - # ## tags. Also, the java version passes spaces through to the cdx file, + # ## It turns out that the refresh tag is being used in both 2xx and + # ## 3xx + # ## responses, so always check both the http location header and the + # ## meta + # ## tags. Also, the java version passes spaces through to the cdx file # ## which might break tools that split cdx lines on whitespace. # # #only deal with 2xx and 3xx responses: @@ -574,14 +581,15 @@ def get_redirect(self, record): # #elif response_code.startswith('2'): # if self.meta_tags and 'refresh' in self.meta_tags: # redir_loc = self.meta_tags['refresh'] - # m = re.search('\d+\s*;\s*url=(.+)', redir_loc, re.I) #url might be capitalized + # m = re.search('\d+\s*;\s*url=(.+)', redir_loc, re.I) + # # url might be capitalized # if m: - # return self.urljoin_and_normalize(record.url, m.group(1), charset) + # return self.urljoin_and_normalize(record.url, m.group(1), + # charset) # # return '-' # get_response_code() //field "s" - #___________________________________________________________________________ def get_response_code(self, record, use_precalculated_value=True): if use_precalculated_value: return self.response_code @@ -596,7 +604,6 @@ def get_response_code(self, record, use_precalculated_value=True): return '-' # split_headers_and_content() - #___________________________________________________________________________ def parse_headers_and_content(self, record): """Returns a list of header lines, split with splitlines(), and the content. We call splitlines() here so we only split once, and so \r\n and \n are @@ -605,12 +612,13 @@ def parse_headers_and_content(self, record): if 'response' == record.type and record.content[1].startswith('HTTP'): try: - headers, content = self.crlf_pattern.split(record.content[1], 1) + headers, content = self.crlf_pattern.split( + record.content[1], 1) except ValueError: headers = record.content[1] content = None headers = headers.splitlines() - elif self.screenshot_mode and 'metadata' == record.type: + elif self.screenshot_mode and 'metadata' == record.type: headers = None content = record.content[1] else: @@ -619,9 +627,7 @@ def parse_headers_and_content(self, record): return headers, content - # should_exclude() - #___________________________________________________________________________ def should_exclude(self, surt_url): if not self.excludes: return False @@ -632,27 +638,26 @@ def should_exclude(self, surt_url): return False - # make_cdx() - #___________________________________________________________________________ def make_cdx(self): if isinstance(self.out_file, basestring): self.out_file = open(self.out_file, 'wb') - self.out_file.write(' CDX ' + self.format + '\n') #print header + self.out_file.write(' CDX ' + self.format + '\n') # print header if not self.all_records: - #filter cdx lines if --all-records isn't specified - allowed_record_types = set(['response', 'revisit']) + # filter cdx lines if --all-records isn't specified + allowed_record_types = set(['response', 'revisit']) disallowed_content_types = set(['text/dns']) stats = { 'num_records_processed': 0, - 'num_records_included': 0, - 'num_records_filtered': 0, + 'num_records_included': 0, + 'num_records_filtered': 0, } fh = ArchiveRecord.open_archive(self.file, gzip="auto", mode="r") - for (offset, record, errors) in fh.read_records(limit=None, offsets=True): + for (offset, record, errors) in fh.read_records(limit=None, + offsets=True): self.offset = offset if record: @@ -660,49 +665,61 @@ def make_cdx(self): if self.screenshot_mode: if record.type != 'metadata': continue - elif not self.all_records and (record.type not in allowed_record_types or record.content_type in disallowed_content_types): + elif not self.all_records and( + record.type not in allowed_record_types or + record.content_type in disallowed_content_types + ): continue - ### arc files from the live web proxy can have a negative content length and a missing payload - ### check the content_length from the arc header, not the computed payload size returned by record.content_length + # arc files from the live web proxy can have a negative content + # length and a missing payload + # check the content_length from the arc header, not the + # computed payload size returned by record.content_length content_length_str = record.get_header(record.CONTENT_LENGTH) - if content_length_str is not None and int(content_length_str) < 0: + if ( + content_length_str is not None and + int(content_length_str) < 0 + ): continue - self.surt = self.get_massaged_url(record, use_precalculated_value=False) + self.surt = self.get_massaged_url( + record, use_precalculated_value=False) if self.should_exclude(self.surt): stats['num_records_filtered'] += 1 continue - ### precalculated data that is used multiple times - self.headers, self.content = self.parse_headers_and_content(record) - self.mime_type = self.get_mime_type(record, use_precalculated_value=False) - self.response_code = self.get_response_code(record, use_precalculated_value=False) - self.meta_tags = self.parse_meta_tags(record) + # precalculated data that is used multiple times + self.headers, self.content = self.parse_headers_and_content( + record) + self.mime_type = self.get_mime_type( + record, use_precalculated_value=False) + self.response_code = self.get_response_code( + record, use_precalculated_value=False) + self.meta_tags = self.parse_meta_tags(record) s = u'' for field in self.format.split(): - if not field in self.field_map: + if field not in self.field_map: raise ParseError('Unknown field: ' + field) endpoint = self.field_map[field].replace(' ', '_') response = getattr(self, 'get_' + endpoint)(record) - #print self.offset - #print record.compressed_record_size - #print record.content_length - #print record.headers - #print len(self.content) - #print repr(record.content[1]) - #print endpoint - #print repr(response) + # print self.offset + # print record.compressed_record_size + # print record.content_length + # print record.headers + # print len(self.content) + # print repr(record.content[1]) + # print endpoint + # print repr(response) s += response + ' ' - self.out_file.write(s.rstrip().encode('utf-8')+'\n') - #record.dump() + self.out_file.write(s.rstrip().encode('utf-8') + '\n') + # record.dump() stats['num_records_included'] += 1 elif errors: raise ParseError(str(errors)) else: - pass # tail + pass # tail fh.close() @@ -712,27 +729,46 @@ def make_cdx(self): f.close() # main() -#_______________________________________________________________________________ if __name__ == '__main__': parser = OptionParser(usage="%prog [options] warc.gz [output_file.cdx]") - parser.set_defaults(format = "N b a m s k r M S V g", - use_full_path = False, - file_prefix = None, - all_records = False, - screenshot_mode = False, - exclude_list = None, - ) - - parser.add_option("--format", dest="format", help="A space-separated list of fields [default: '%default']") - parser.add_option("--use-full-path", dest="use_full_path", action="store_true", help="Use the full path of the warc file in the 'g' field") - parser.add_option("--file-prefix", dest="file_prefix", help="Path prefix for warc file name in the 'g' field." - " Useful if you are going to relocate the warc.gz file after processing it." - ) - parser.add_option("--all-records", dest="all_records", action="store_true", help="By default we only index http responses. Use this flag to index all WARC records in the file") - parser.add_option("--screenshot-mode", dest="screenshot_mode", action="store_true", help="Special Wayback Machine mode for handling WARCs containing screenshots") - parser.add_option("--exclude-list", dest="exclude_list", help="File containing url prefixes to exclude") - parser.add_option("--stats-file", dest="stats_file", help="Output json file containing statistics") + parser.set_defaults(format="N b a m s k r M S V g", + use_full_path=False, + file_prefix=None, + all_records=False, + screenshot_mode=False, + exclude_list=None, + ) + + parser.add_option("--format", + dest="format", + help="A space-separated list of fields " + "[default: '%default']") + parser.add_option("--use-full-path", + dest="use_full_path", + action="store_true", + help="Use the full path of the warc file in the 'g' " + "field") + parser.add_option("--file-prefix", + dest="file_prefix", + help="Path prefix for warc file name in the 'g' field." + " Useful if you are going to relocate the warc.gz " + "file after processing it." + ) + parser.add_option("--all-records", + dest="all_records", + action="store_true", + help="By default we only index http responses." + "Use this flag to index all WARC records in the file") + parser.add_option("--screenshot-mode", + dest="screenshot_mode", + action="store_true", + help="Special Wayback Machine mode for handling " + "WARCs containing screenshots") + parser.add_option("--exclude-list", dest="exclude_list", + help="File containing url prefixes to exclude") + parser.add_option("--stats-file", dest="stats_file", + help="Output json file containing statistics") (options, input_files) = parser.parse_args(args=sys.argv[1:]) @@ -745,11 +781,11 @@ def make_cdx(self): cdx_writer = CDX_Writer(input_files[0], input_files[1], format=options.format, - use_full_path = options.use_full_path, - file_prefix = options.file_prefix, - all_records = options.all_records, - screenshot_mode = options.screenshot_mode, - exclude_list = options.exclude_list, - stats_file = options.stats_file, - ) + use_full_path=options.use_full_path, + file_prefix=options.file_prefix, + all_records=options.all_records, + screenshot_mode=options.screenshot_mode, + exclude_list=options.exclude_list, + stats_file=options.stats_file, + ) cdx_writer.make_cdx() diff --git a/warctools/__init__.py b/warctools/__init__.py new file mode 100644 index 0000000..cb7aa56 --- /dev/null +++ b/warctools/__init__.py @@ -0,0 +1,13 @@ +from .record import ArchiveRecord +from .warc import WarcRecord +from .arc import ArcRecord +from . import record, warc, arc + +__all__= [ + 'ArchiveRecord', + 'ArcRecord', + 'WarcRecord', + 'record', + 'warc', + 'arc' +] diff --git a/warctools/arc.py b/warctools/arc.py new file mode 100644 index 0000000..2ba8d15 --- /dev/null +++ b/warctools/arc.py @@ -0,0 +1,288 @@ +"""An object to represent arc records""" + +import sys +import re +import base64 +import hashlib +import zlib + +from .record import ArchiveRecord,ArchiveParser +from .stream import open_record_stream +from .archive_detect import register_record_type + +# URLIP-addressArchive-dateContent-type +#Result-codeChecksumLocation OffsetFilename +#Archive-length +# +@ArchiveRecord.HEADERS( + URL='URL', + IP='IP-address', + DATE='Archive-date', + CONTENT_TYPE = 'Content-type', + CONTENT_LENGTH = 'Archive-length', + RESULT_CODE = 'Result-code', + CHECKSUM = 'Checksum', + LOCATION = 'Location', + OFFSET = 'Offset', + FILENAME = 'Filename', +) +class ArcRecord(ArchiveRecord): + def __init__(self, headers=None, content=None, errors=None): + ArchiveRecord.__init__(self,headers,content,errors) + + @property + def type(self): + return "response" + + def _write_to(self, out, nl): + pass + + @classmethod + def make_parser(self): + return ArcParser() + + +class ArcRecordHeader(ArcRecord): + def __init__(self, headers=None, content=None, errors=None, version=None, raw_headers=None): + ArcRecord.__init__(self,headers,content,errors) + self.version = version + self.raw_headers = raw_headers + @property + def type(self): + return "filedesc" + def raw(self): + return "".join(self.raw_headers) + self.content[1] + +def rx(pat): + return re.compile(pat,flags=re.IGNORECASE) + +nl_rx=rx('^\r\n|\r|\n$') +length_rx = rx('^'+ArcRecord.CONTENT_LENGTH+'$') +type_rx = rx('^'+ArcRecord.CONTENT_TYPE+'$') + +class ArcParser(ArchiveParser): + def __init__(self): + self.version = 0 + # we don't know which version to parse initially - a v1 or v2 file + # so we read the filedesc because the order and number of the headers change + # between versions. + + # question? will we get arc fragments? + # should we store both headers & detect records by header length? + # if we don't know + + self.headers = [] + self.trailing_newlines = 0 + + #raj: alexa arc files don't always have content-type in header + self.short_headers = [ArcRecord.URL, ArcRecord.IP, ArcRecord.DATE, ArcRecord.CONTENT_LENGTH] + + def parse(self, stream, offset): + record = None + content_type = None + content_length = None + try: + line = stream.readline() + except zlib.error: + #raj: some ARC files contain trailing padding zeros + #see DR_crawl22.20030622054102-c/DR_crawl22.20030622142039.arc.gz for an example + return (None,(), offset) + while not line.rstrip(): + if not line: + return (None,(), offset) + self.trailing_newlines-=1 + if offset: + offset += len(line) #rajbot + line = stream.readline() + + while line.endswith('\r'): + #raj: some ARC record headers contain a url with a '\r' character. + #The record header should end in \n, but we may also encounter + #malformed header lines that end with \r, so we need to see of we + #read the whole header, or just part of the url. + if not re.search('(?:\d{1,3}\.){3}\d{1,3} \d{14} \S+ \d+$', line): + line += stream.readline() + else: + break + + if line.startswith('filedesc:'): + raw_headers = [] + raw_headers.append(line) + # read headers named in body of record + # to assign names to header, to read body of record + arc_version_line = stream.readline() + raw_headers.append(arc_version_line) + arc_names_line = stream.readline() + raw_headers.append(arc_names_line) + + arc_version=arc_version_line.strip() + + # configure parser instance + self.version = arc_version.split()[0] + self.headers = arc_names_line.strip().split() + + # raj: some v1 ARC files are incorrectly sending a v2 header names line + if arc_names_line == 'URL IP-address Archive-date Content-type Result-code Checksum Location Offset Filepath Archive-length\n': + if arc_version == '1 0 InternetArchive' and 5 == len(line.split(' ')): + self.headers = ['URL', 'IP-address', 'Archive-date', 'Content-type', 'Archive-length'] + + # now we have read header field in record body + # we can extract the headers from the current record, + # and read the length field + + # which is in a different place with v1 and v2 + + # read headers + arc_headers = self.get_header_list(line.strip().split()) + + # extract content, ignoring header lines parsed already + content_type, content_length, errors = self.get_content_headers(arc_headers) + + content_length = content_length - len(arc_version_line) - len(arc_names_line) + + record = ArcRecordHeader(headers = arc_headers, version=arc_version, errors=errors, raw_headers=raw_headers) + + else: + if not self.headers: + #raj: some arc files are missing the filedesc:// line + #raise StandardHeader('missing filedesc') + self.version = '1' + self.headers = ['URL', 'IP-address', 'Archive-date', 'Content-type', 'Archive-length'] + + #raj: change the call to split below to only split on space (some arcs have a \x0c formfeed character in the url) + headers = self.get_header_list(line.strip().split(' ')) + content_type, content_length, errors = self.get_content_headers(headers) + + record = ArcRecord(headers = headers, errors=errors) + + ### raj: + ### We do this because we don't want to read large records into memory, + ### since this was exhasting memory and crashing for large payloads. + sha1_digest = None + if record.url.startswith('http'): + parsed_http_header = False + sha1_digest = hashlib.sha1() + else: + #This isn't a http response so pretend we already parsed the http header + parsed_http_header = True + + line = None + + if content_length > 0: ###raj: some arc files have a negative content_length and no payload. + content=[] + length = 0 + + should_skip_content = False + if content_length > ArchiveParser.content_length_limit: + should_skip_content = True + + while length < content_length: + if not parsed_http_header: + line = stream.readline() + else: + bytes_to_read = min(content_length-length, 1024) + line = stream.read(bytes_to_read) #TODO: rename variable. may be more than just one line + if not line: + #print 'no more data' + break + + if should_skip_content: + if not parsed_http_header: + content.append(line) + else: + content.append(line) + + length+=len(line) + + if sha1_digest: + if parsed_http_header: + if length <= content_length: + sha1_digest.update(line) + else: + sha1_digest.update(line[:-(length-content_length)]) + + if not parsed_http_header: + if nl_rx.match(line): + parsed_http_header = True + + if sha1_digest: + sha1_str = 'sha1:'+base64.b32encode(sha1_digest.digest()) + record.headers.append(('WARC-Payload-Digest', sha1_str)) + + content="".join(content) + + ### note the content_length+1 below, which is not in the WARC parser + ### the +1 might be a bug + #content, line = content[0:content_length], content[content_length+1:] + content = content[0:content_length] + if length > content_length: + #line is the last line we read + trailing_chars = line[-(length-content_length):] #note that we removed the +1 from above + else: + trailing_chars = '' + + record.content = (content_type, content) + + if trailing_chars: + record.error('trailing data at end of record', line) + if trailing_chars == '': + self.trailing_newlines = 1 + + return (record, (), offset) + + def trim(self, stream): + return () + + def get_header_list(self, values): + num_values = len(values) + + #raj: some headers contain urls with unescaped spaces + if num_values > 5: + if re.match('^(?:\d{1,3}\.){3}\d{1,3}$', values[-4]) and re.match('^\d{14}$', values[-3]) and re.match('^\d+$', values[-1]): + values = ['%20'.join(values[0:-4]), values[-4], values[-3], values[-2], values[-1]] + num_values = len(values) + + if 4 == num_values: + #raj: alexa arc files don't always have content-type in header + return zip(self.short_headers, values) + elif 5 == num_values: + #normal case + #raj: some old alexa arcs have ip-address and date transposed in the header + if re.match('^\d{14}$', values[1]) and re.match('^(?:\d{1,3}\.){3}\d{1,3}$', values[2]): + values[1], values[2] = values[2], values[1] + + return zip(self.headers, values) + elif 6 == num_values: + #raj: some old alexa arcs have "content-type; charset" in the header + v = values[0:4]+values[5:] + v[3] = v[3].rstrip(';') + return zip(self.headers, v) + else: + raise Exception('invalid number of header fields') + + @staticmethod + def get_content_headers(headers): + content_type = None + content_length = None + errors = [] + + for name,value in headers: + if type_rx.match(name): + if value: + content_type = value + else: + errors.append(('invalid header',name,value)) + elif length_rx.match(name): + try: + content_length = int(value) + except ValueError: + errors.append(('invalid header',name,value)) + + return content_type, content_length, errors + + +register_record_type(re.compile('^filedesc://'), ArcRecord) + +#raj: some arc files are missing the filedesc:// line +url_record_regex = re.compile('^https?://\S+ (?:\d{1,3}\.){3}\d{1,3} \d{14} \S+ \d+$') +register_record_type(url_record_regex, ArcRecord) diff --git a/warctools/archive_detect.py b/warctools/archive_detect.py new file mode 100644 index 0000000..df09f2d --- /dev/null +++ b/warctools/archive_detect.py @@ -0,0 +1,27 @@ +import gzip + +archive_types = [] + +def is_gzip_file(file_handle): + signature = file_handle.read(2) + file_handle.seek(-len(signature),1) + return signature == '\x1f\x8b' + +def guess_record_type(file_handle): + offset = file_handle.tell() + if is_gzip_file(file_handle): + nfh=gzip.GzipFile(fileobj=file_handle) + else: + nfh=file_handle + + line = nfh.readline() + file_handle.seek(offset) + for rx, record in archive_types: + if rx.match(line): + return record + + else: + return None + +def register_record_type(rx, record): + archive_types.append((rx,record)) diff --git a/warctools/log.py b/warctools/log.py new file mode 100644 index 0000000..39e2c7f --- /dev/null +++ b/warctools/log.py @@ -0,0 +1,9 @@ +import sys + +__all__ = ['debug'] + +debug = lambda:None + +if __debug__: + def debug(*args): + print >> sys.stderr, 'WARCTOOLS',args diff --git a/warctools/record.py b/warctools/record.py new file mode 100644 index 0000000..736374a --- /dev/null +++ b/warctools/record.py @@ -0,0 +1,128 @@ +"""a skeleton class for archive records""" + +from gzip import GzipFile +import re + +from .stream import open_record_stream + +strip = re.compile(r'[^\w\t \|\\\/]') + +def add_headers(**kwargs): + """a useful helper for defining header names in record formats""" + def _add_headers(cls): + for k,v in kwargs.iteritems(): + setattr(cls,k,v) + cls._HEADERS = kwargs.keys() + return cls + return _add_headers + +class ArchiveParser(object): + """ methods parse, and trim """ + #To avoid exhausing memory while reading large payloads, don't + #store large records. + content_length_limit = 5 * 1024 * 1024 + + + +@add_headers(DATE='Date', CONTENT_TYPE='Type', CONTENT_LENGTH='Length', TYPE='Type',URL='Url') +class ArchiveRecord(object): + """An archive record has some headers, maybe some content and + a list of errors encountered. record.headers is a list of tuples (name, + value). errors is a list, and content is a tuple of (type, data)""" + def __init__(self, headers=None, content=None, errors=None): + self.headers = headers if headers else [] + self.content = content if content else (None, "") + self.errors = errors if errors else [] + + self.compressed_record_size = None ###rajbot + + HEADERS=staticmethod(add_headers) + + @property + def date(self): + return self.get_header(self.DATE) + + def error(self, *args): + self.errors.append(args) + + @property + def type(self): + return self.get_header(self.TYPE) + + @property + def content_type(self): + return self.content[0] + + @property + def content_length(self): + return len(self.content[1]) + + @property + def url(self): + return self.get_header(self.URL) + + def get_header(self, name): + for k,v in self.headers: + if name == k: + return v + + def set_header(self, name, value): + self.headers = [(k,v) for (k,v) in self.headers if k != name] + self.headers.append((name, value)) + + def dump(self, content=True): + print 'Headers:' + for (h,v) in self.headers: + print '\t%s:%s'%(h,v) + if content and self.content: + print 'Content Headers:' + content_type, content_body = self.content + print '\t',self.CONTENT_TYPE,':',content_type + print '\t',self.CONTENT_LENGTH,':',len(content_body) + print 'Content:' + ln = min(1024, len(content_body)) + print '\t', strip.sub(lambda x:'\\x%00X'%ord(x.group()),content_body[:ln]) + print '\t...' + print + else: + print 'Content: none' + print + print + if self.errors: + print 'Errors:' + for e in self.errors: + print '\t', e + + def write_to(self, out, newline='\x0D\x0A', gzip=False): + if gzip: + out=GzipFile(fileobj=out) + self._write_to(out, newline) + if gzip: + out.flush() + out.close() + + + def _write_to(self, out, newline): + raise AssertionError, 'this is bad' + + + ### class methods for parsing + @classmethod + def open_archive(cls , filename=None, file_handle=None, mode="rb+", gzip="auto"): + """Generically open an archive - magic autodetect""" + if cls is ArchiveRecord: + cls = None # means guess + return open_record_stream(cls, filename, file_handle, mode, gzip) + + @classmethod + def make_parser(self): + """Reads a (w)arc record from the stream, returns a tuple (record, errors). + Either records is null or errors is null. Any record-specific errors are + contained in the record - errors is only used when *nothing* could be parsed""" + raise StandardError + + + + + + diff --git a/warctools/stream.py b/warctools/stream.py new file mode 100644 index 0000000..d394b99 --- /dev/null +++ b/warctools/stream.py @@ -0,0 +1,259 @@ +"""Read records from normal file and compressed file""" + +import zlib +import gzip +import re + +from .log import debug +from .archive_detect import is_gzip_file, guess_record_type + +def open_record_stream(record_class=None, filename=None, file_handle=None, mode="rb+", gzip="auto"): + """Can take a filename or a file_handle. Normally called indirectly from + A record class i.e WarcRecord.open_archive. If the first parameter is None, will try to guess""" + + if file_handle is None: + file_handle = open(filename, mode=mode) + else: + if not filename: + filename = file_handle.name + + if record_class == None: + record_class = guess_record_type(file_handle) + + if record_class == None: + raise StandardError('Failed to guess compression') + + record_parser = record_class.make_parser() + + if gzip == 'auto': + if is_gzip_file(file_handle): + gzip = 'record' + #debug('autodetect: record gzip') + else: + # assume uncompressed file + #debug('autodetected: uncompressed file') + gzip = None + + + if gzip=='record': + return GzipRecordStream(file_handle, record_parser) + elif gzip=='file': + return GzipFileStream(file_handle, record_parser) + else: + return RecordStream(file_handle, record_parser) + + +class RecordStream(object): + """A readable/writable stream of Archive Records. Can be iterated over + or read_records can give more control, and potentially offset information. + """ + def __init__(self, file_handle, record_parser): + self.fh = file_handle + self.record_parser = record_parser + self._parser = None + + def seek(self, offset, pos=0): + """Same as a seek on a file""" + self.fh.seek(offset,pos) + + def read_records(self, limit=1, offsets=True): + """Yield a tuple of (offset, record, errors) where + Offset is either a number or None. + Record is an object and errors is an empty list + or record is none and errors is a list""" + + nrecords = 0 + while nrecords < limit or limit is None: + offset, record, errors = self._read_record(offsets) + nrecords+=1 + yield (offset, record,errors) + if not record: + break + + def __iter__(self): + while True: + offset, record, errors = self._read_record(offsets=False) + if record: + yield record + elif errors: + raise StandardError('Errors while decoding '+",".join(errors)) + else: + break + + def _read_record(self, offsets): + """overridden by sub-classes to read individual records""" + offset = self.fh.tell() if offsets else None + record, errors, offset = self.record_parser.parse(self.fh, offset) + if record: + #TODO: use compressed_record_size to store size of UNcompressed record for now, rename later + record.compressed_record_size = self.fh.tell() - offset + + return offset, record, errors + + def write(self, record): + record.write_to(self) + + def close(self): + self.fh.close() + +class GzipRecordStream(RecordStream): + """A stream to read/write concatted file made up of gzipped archive records""" + def __init__(self, file_handle, record_parser): + RecordStream.__init__(self,file_handle, record_parser) + self.gz = None + + ###rajbot + self.next_record = None + self.next_errors = None + self.next_offset = None + + def _read_record(self, offsets): + """rajbot: restructure this function to call parse() twice, once + to read the record, and once to read possible trailing bytes. + This is to ensure that the compressed_record_size is correct. + """ + errors = [] + + if self.next_record: + record = self.next_record + errors = self.next_errors + offset = self.next_offset if offsets else None + + self.next_record = None + self.next_errors = None + self.next_offset = None + + return offset, record, errors + + + offset_ = self.fh.tell() + self.gz = GzipRecordFile(self.fh) + record, r_errors, tmp_offset = self.record_parser.parse(self.gz, offset=None) + + next_offset_ = self.fh.tell() + next_record, next_errors, tmp_next_offset = self.record_parser.parse(self.gz, offset=None) + + if not next_record: + if record: + record.compressed_record_size = self.fh.tell() - offset_ + else: + next_record.error('multiple warc records in gzip record file') + if record: + record.compressed_record_size = next_offset_ - offset_ + next_record.compressed_record_size = self.fh.tell() - next_offset_ + self.next_record = next_record + self.next_errors = next_errors + self.next_offset = next_offset_ + + self.gz.close() + + offset = offset_ if offsets else None + errors.extend(r_errors) + return offset, record, errors + + +class GzipFileStream(RecordStream): + """A stream to read/write gzipped file made up of all archive records""" + def __init__(self, file_handle, record): + RecordStream.__init__(self,gzip.GzipFile(fileobj=file_handle), record) + def _read_record(self, offsets): + # no real offsets in a gzipped file (no seperate records) + return RecordStream._read_record(self, False) + + + +### record-gzip handler, based on zlib +### implements readline() access over a a single +### gzip-record. must be re-created to read another record + + +CHUNK_SIZE=1024 # the size to read in, make this bigger things go faster. +line_rx=re.compile('^(?P^[^\r\n]*(?:\r\n|\r(?!\n)|\n))(?P.*)$',re.DOTALL) + +class GzipRecordFile(object): + """A file like class providing 'readline' over catted gzip'd records""" + def __init__(self, fh): + self.fh = fh + self.buffer="" + self.z = zlib.decompressobj(16+zlib.MAX_WBITS) + self.done = False + + + def _getline(self): + if self.buffer: + #a,nl,b + match=line_rx.match(self.buffer) + #print match + # print 'split:', split[0],split[1], len(split[2]) + if match: + output = match.group('line') + self.buffer = ""+match.group('tail') + return output + elif self.done: + output = self.buffer + self.buffer = "" + + return output + + def readline(self): + while True: + output = self._getline() + if output: + return output + + if self.done: + return "" + + #print 'read chunk at', self.fh.tell(), self.done + chunk = self.fh.read(CHUNK_SIZE) + out = self.z.decompress(chunk) + + if out: + #rajbot: if the decompressed string ends with a \r, then + #read another chunk in case a \n is next. We need to do + #this because the line_rx pattern allows \r to be a valid + #newline, which causes the following \n to become a blank + #line which might prematurely end parsing of warc headers. + if out.endswith('\r') and not self.z.unused_data: + next_chunk = self.fh.read(CHUNK_SIZE) + next_out = self.z.decompress(next_chunk) + if next_out is not None: + out += next_out + + self.buffer+=out + + if self.z.unused_data: + #print 'unused', len(self.z.unused_data) + self.fh.seek(-len(self.z.unused_data),1) + self.done=True + continue + if not chunk: + self.done = True + continue + + def read(self, num_bytes): + # add bytes to self.buff if the current buffer doesn't have enough + # data in it, and we have not exhaused the compressed stream + while (len(self.buffer) < num_bytes) and not self.done: + chunk = self.fh.read(CHUNK_SIZE) + d_bytes = self.z.decompress(chunk) + if d_bytes: + self.buffer += d_bytes + + if self.z.unused_data: + #print 'unused', len(self.z.unused_data) + self.fh.seek(-len(self.z.unused_data),1) + self.done=True + break + if not chunk: + self.done = True + break + + out = self.buffer[:num_bytes] + self.buffer = self.buffer[num_bytes:] + + return out + + def close(self): + if self.z: + self.z.flush() diff --git a/warctools/warc.py b/warctools/warc.py new file mode 100644 index 0000000..8c933bc --- /dev/null +++ b/warctools/warc.py @@ -0,0 +1,454 @@ +"""An object to represent warc records, using the abstract record in record.py""" + +import re +import base64 +import hashlib +from .record import ArchiveRecord,ArchiveParser +from .archive_detect import register_record_type + +bad_lines = 5 # when to give up looking for the version stamp + +@ArchiveRecord.HEADERS( + DATE='WARC-Date', + TYPE = 'WARC-Type', + ID = 'WARC-Record-ID', + CONCURRENT_TO = 'WARC-Concurrent-To', + REFERS_TO = 'WARC-Refers-To', + CONTENT_LENGTH = 'Content-Length', + CONTENT_TYPE = 'Content-Type', + URL='WARC-Target-URI', + BLOCK_DIGEST='WARC-Block-Digest', + IP_ADDRESS='WARC-IP-Address', + FILENAME='WARC-Filename', + WARCINFO_ID='WARC-Warcinfo-ID', + PAYLOAD_DIGEST = 'WARC-Payload-Digest', +) +class WarcRecord(ArchiveRecord): + VERSION="WARC/1.0" + VERSION18="WARC/0.18" + VERSION17="WARC/0.17" + RESPONSE="response" + REQUEST="request" + METADATA="metadata" + CONVERSION="conversion" + WARCINFO="warcinfo" + + def __init__(self, version=VERSION, headers=None, content=None, errors=None): + ArchiveRecord.__init__(self,headers,content,errors) + self.version = version + + @property + def id(self): + return self.get_header(self.ID) + + def _write_to(self, out, nl): + """WARC Format: + VERSION NL + (Key: Value NL)* + NL + CONTENT NL + NL + + don't write multi line headers + """ + out.write(self.version) + out.write(nl) + for k,v in self.headers: + if k not in (self.CONTENT_TYPE, self.CONTENT_LENGTH, self.BLOCK_DIGEST): + out.write(k) + out.write(": ") + out.write(v) + out.write(nl) + content_type, content_buffer = self.content + content_buffer=buffer(content_buffer) + if content_type: + out.write(self.CONTENT_TYPE) + out.write(": ") + out.write(content_type) + out.write(nl) + if content_buffer is None: + content_buffer="" + + content_length = len(content_buffer) + out.write(self.CONTENT_LENGTH) + out.write(": ") + out.write(str(content_length)) + out.write(nl) + + block_hash = hashlib.sha256() + block_hash.update(content_buffer) + + digest= "sha256:%s"%block_hash.hexdigest() + + out.write(self.BLOCK_DIGEST) + out.write(": ") + out.write(digest) + out.write(nl) + + # end of header blank nl + out.write(nl) + if content_buffer: + out.write(content_buffer[:content_length]) + out.write(nl) + out.write(nl) + out.flush() + + def repair(self): + pass + + def validate(self): + return self.errors + + @classmethod + def make_parser(self): + return WarcParser() + +def rx(pat): + return re.compile(pat,flags=re.IGNORECASE) + +version_rx = rx(r'^(?P.*?)(?P\s*WARC/(?P.*?))' '(?P\r\n|\r|\n)\\Z') +# a header is key: value plus any following lines with leading whitespace +header_rx = rx(r'^(?P.*?):\s?(?P.*?)' '(?P\r\n|\r|\n)\\Z') +value_rx = rx(r'^\s+(?P.+?)' '(?P\r\n|\r|\n)\\Z') +nl_rx=rx('^(?P\r\n|\r|\n\\Z)') +length_rx = rx('^'+WarcRecord.CONTENT_LENGTH+'$') +type_rx = rx('^'+WarcRecord.CONTENT_TYPE+'$') + +required_headers = set(( + WarcRecord.TYPE.lower(), + WarcRecord.ID.lower(), + WarcRecord.CONTENT_LENGTH.lower(), + WarcRecord.DATE.lower(), +)) + +class WarcParser(ArchiveParser): + KNOWN_VERSIONS=set(('1.0', '0.17', '0.18')) + def __init__(self): + self.trailing_newlines = 0 + + def parse(self,stream, offset): + """Reads a warc record from the stream, returns a tuple (record, errors). + Either records is null or errors is null. Any record-specific errors are + contained in the record - errors is only used when *nothing* could be parsed""" + errors = [] + version = None + # find WARC/.* + line = stream.readline() + newlines = self.trailing_newlines + if newlines > 0: + while line: + match = nl_rx.match(line) + if match and newlines > 0: + if offset is not None: offset+=len(line) + newlines-=1 + if match.group('nl') != '\x0d\x0a': + errors.append(('incorrect trailing newline', match.group('nl'))) + line = stream.readline() + if newlines == 0: + break + else: + break + + if newlines > 0: + errors+=('less than two terminating newlines at end of previous record, missing', newlines) + + while line: + match = version_rx.match(line) + + if match: + version = match.group('version') + if offset is not None: offset+=len(match.group('prefix')) + break + else: + if offset is not None: offset+=len(line) + if not nl_rx.match(line): + errors.append(('ignored line', line)) + if len(errors) > bad_lines: + errors.append(('too many errors, giving up hope',)) + return (None,errors, offset) + line = stream.readline() + if not line: + if version: + errors.append('warc version but no headers', version) + self.trailing_newlines = 0 + return (None, errors, offset) + if line: + content_length = 0 + content_type = None + + record = WarcRecord(errors=errors, version=version) + + + if match.group('nl') != '\x0d\x0a': + record.error('incorrect newline in version', match.group('nl')) + + if match.group('number') not in self.KNOWN_VERSIONS: + record.error('version field is not known (%s)'%(",".join(self.KNOWN_VERSIONS)), match.group('number')) + + + prefix = match.group('prefix') + + if prefix: + record.error('bad prefix on WARC version header', prefix) + + #Read headers + line = stream.readline() + while line and not nl_rx.match(line): + + #print 'header', repr(line) + match = header_rx.match(line) + if match: + if match.group('nl') != '\x0d\x0a': + record.error('incorrect newline in header', match.group('nl')) + name = match.group('name').strip() + value = [match.group('value').strip()] + #print 'match',name, value + + line = stream.readline() + match = value_rx.match(line) + while match: + #print 'follow', repr(line) + if match.group('nl') != '\x0d\x0a': + record.error('incorrect newline in follow header',line, match.group('nl')) + value.append(match.group('value').strip()) + line = stream.readline() + match = value_rx.match(line) + + value = " ".join(value) + + if type_rx.match(name): + if value: + content_type = value + else: + record.error('invalid header',name,value) + elif length_rx.match(name): + try: + #print name, value + content_length = int(value) + #print content_length + except ValueError: + record.error('invalid header',name,value) + else: + record.headers.append((name,value)) + + # have read blank line following headers + + # read content + + ### rajbot: if the WARC-Payload-Digest is not present, fabricate it. + ### We do this because we don't want to read large records into memory, + ### since this was exhasting memory and crashing for large payloads. + sha1_digest = None + if 'response' == record.type and re.match('^application/http;\s*msgtype=response$', content_type): + parsed_http_header = False + digest = record.get_header(WarcRecord.PAYLOAD_DIGEST) + if digest is None: + sha1_digest = hashlib.sha1() + else: + #This isn't a http response so pretend we already parsed the http header + parsed_http_header = True + + if content_length is not None: + if content_length > 0: + content=[] + length = 0 + + should_skip_content = False + if content_length > ArchiveParser.content_length_limit: + should_skip_content = True + + while length < content_length: + if not parsed_http_header: + line = stream.readline() + #print 'header:', line + else: + bytes_to_read = min(content_length-length, 1024) + line = stream.read(bytes_to_read) #TODO: rename variable. may be more than just one line + #line = stream.readline() + #print 'line:', repr(line) + if not line: + #print 'no more data' + break + + if should_skip_content: + if not parsed_http_header: + content.append(line) + else: + content.append(line) + + length+=len(line) + + if sha1_digest: + if parsed_http_header: + if length <= content_length: + sha1_digest.update(line) + else: + sha1_digest.update(line[:-(length-content_length)]) + + if not parsed_http_header: + if nl_rx.match(line): + parsed_http_header = True + + #print length, content_length, line + #else: + # print 'last line of content', repr(line) + if sha1_digest: + sha1_str = 'sha1:'+base64.b32encode(sha1_digest.digest()) + record.headers.append((WarcRecord.PAYLOAD_DIGEST, sha1_str)) + + content="".join(content) + + if length > content_length: + #line is the last line we read + trailing_chars = line[-(length-content_length):] + else: + trailing_chars = '' + + #content, line = content[0:content_length], content[content_length:] + content = content[0:content_length] + + #if len(content)!= content_length: + if length < content_length: + record.error('content length mismatch (is, claims)', length, content_length) + + record.content = (content_type, content) + + if nl_rx.match(trailing_chars): + self.trailing_newlines = 1 + else: + self.trailing_newlines = 2 + + else: + record.error('missing header', WarcRecord.CONTENT_LENGTH) + self.trailing_newlines = 2 + + # Fixed: READLINE BUG - eats trailing terminating newlines when content doesn't have a \n + + #print 'read content', repr(line) + # have read trailing newlines + + # check mandatory headers + # WARC-Type + # WARC-Date WARC-Record-ID Content-Length + + # ignore mandatory newlines for now + # because they are missing. + # instead we trim a number of them off the next + # parse + + # we can't re-wind easily without wrapping + # every file handle + + # not brilliant but hey-ho + + + + + return (record, (), offset) + + def trim(self, stream): + """read the end of the file""" + newlines = self.trailing_newlines + self.trailing_newlines = 0 + errors = [] + if newlines: + line = stream.readline() + while line: + #print 'trimming', repr(line) + match = nl_rx.match(line) + if match: + if match.group('nl') != '\x0d\x0a': + errors.append(('incorrect trailing newline', match.group('nl'))) + newlines-=1 + #print 'newline' + if newlines == 0: + break + + else: + #print 'line', line, newlines + newlines = 0 + errors.append(('trailing data after content', line)) + line = stream.readline() + if newlines > 0: + errors+=('less than two terminating newlines at end of record, missing', newlines) + + return errors + + + +blank_rx = rx(r'^$') +register_record_type(version_rx, WarcRecord) +register_record_type(blank_rx, WarcRecord) + +def make_response(id, date, url, content, request_id): + headers = [ + (WarcRecord.TYPE, WarcRecord.RESPONSE), + (WarcRecord.ID, id), + (WarcRecord.DATE, date), + (WarcRecord.URL, url), + + ] + if request_id: + headers.append((WarcRecord.CONCURRENT_TO, request_id)) + + record=WarcRecord(headers=headers, content=content) + + return record + +def make_request(request_id, date, url, content, response_id): + headers = [ + (WarcRecord.TYPE, WarcRecord.REQUEST), + (WarcRecord.ID, request_id), + (WarcRecord.DATE, date), + (WarcRecord.URL, url), + + ] + if response_id: + headers.append((WarcRecord.CONCURRENT_TO, response_id)) + + record=WarcRecord(headers=headers, content=content) + + return record + +def make_metadata(meta_id, date, content, concurrent_to=None, url=None): + headers = [ + (WarcRecord.TYPE, WarcRecord.METADATA), + (WarcRecord.ID, meta_id), + (WarcRecord.DATE, date), + + ] + if concurrent_to: + headers.append((WarcRecord.CONCURRENT_TO, concurrent_to)) + + if url: + headers.append((WarcRecord.URL, url)) + + record=WarcRecord(headers=headers, content=content) + + return record + + +def make_conversion(conv_id, date, content, refers_to=None, url=None): + headers = [ + (WarcRecord.TYPE, WarcRecord.CONVERSION), + (WarcRecord.ID, conv_id), + (WarcRecord.DATE, date), + + ] + if refers_to: + headers.append((WarcRecord.REFERS_TO, refers_to)) + + if url: + headers.append((WarcRecord.URL, url)) + + record=WarcRecord(headers=headers, content=content) + + return record + + + +def warc_datetime_str(d): + s = d.isoformat() + if '.' in s: + s = s[:s.find('.')] + return s +'Z'