Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON export + BigQuery compatibility #63

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions es2csv.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import codecs
import json
import os
import time
import json
import codecs
from functools import wraps

import elasticsearch
import progressbar
from backports import csv
from functools import wraps


FLUSH_BUFFER = 1000 # Chunk of docs to flush in temp file
CONNECTION_TIMEOUT = 120
Expand Down Expand Up @@ -52,6 +52,8 @@ def __init__(self, opts):
self.scroll_time = '30m'

self.csv_headers = list(META_FIELDS) if self.opts.meta_fields else []
self.header_delimiter = self.opts.header_delimiter or '.'
self.big_query_compat = self.opts.big_query
self.tmp_file = '{}.tmp'.format(opts.output_file)

@retry(elasticsearch.exceptions.ConnectionError, tries=TIMES_TO_TRY)
Expand Down Expand Up @@ -171,7 +173,9 @@ def next_scroll(scroll_id):
bar.finish()

def flush_to_file(self, hit_list):
def to_keyvalue_pairs(source, ancestors=[], header_delimeter='.'):
header_delimiter = self.header_delimiter
big_query_compat = self.big_query_compat
def to_keyvalue_pairs(source, ancestors=[]):
def is_list(arg):
return type(arg) is list

Expand All @@ -188,13 +192,19 @@ def is_dict(arg):
else:
[to_keyvalue_pairs(item, ancestors + [str(index)]) for index, item in enumerate(source)]
else:
header = header_delimeter.join(ancestors)
header = header_delimiter.join(ancestors)
if header not in self.csv_headers:
self.csv_headers.append(header)
try:
out[header] = '{}{}{}'.format(out[header], self.opts.delimiter, source)
except:
out[header] = source
if big_query_compat == False:
try:
out[header] = '{}{}{}'.format(out[header], self.opts.delimiter, source)
except:
out[header] = source
else:
try:
out[header.replace('@', '_')] = '{}{}{}'.format(out[header], self.opts.delimiter, source)
except:
out[header.replace('@', '_')] = source

with codecs.open(self.tmp_file, mode='a', encoding='utf-8') as tmp_file:
for hit in hit_list:
Expand Down Expand Up @@ -232,6 +242,10 @@ def write_to_csv(self):
print('There is no docs with selected field(s): {}.'.format(','.join(self.opts.fields)))
os.remove(self.tmp_file)

def write_to_json(self):
if self.num_results > 0:
os.rename(self.tmp_file, self.opts.output_file)

def clean_scroll_ids(self):
try:
self.es_conn.clear_scroll(body=','.join(self.scroll_ids))
Expand Down
11 changes: 9 additions & 2 deletions es2csv_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
es2csv -q '*' -t dev prod -u http://login:[email protected]:6666/es/ -o ~/file.csv
es2csv -q '{"query": {"match_all": {}}, "filter":{"term": {"tags": "dev"}}}' -r -u http://login:[email protected]:6666/es/ -o ~/file.csv
"""
import sys
import argparse
import sys

import es2csv

__version__ = '5.5.2'
Expand All @@ -35,6 +36,9 @@ def main():
p.add_argument('-k', '--kibana-nested', dest='kibana_nested', action='store_true', help='Format nested fields in Kibana style.')
p.add_argument('-r', '--raw-query', dest='raw_query', action='store_true', help='Switch query format in the Query DSL.')
p.add_argument('-e', '--meta-fields', dest='meta_fields', action='store_true', help='Add meta-fields in output.')
p.add_argument('-j', '--json', dest='json', action='store_true', help='Output as line-separated JSON instead of CSV')
p.add_argument('-l', '--header-delimiter', dest='header_delimiter', type=str, help='Delimiter to use with JSON nested fields')
p.add_argument('-b', '--big-query', dest='big_query', action="store_true", help='Export with BigQuery compatibility')
p.add_argument('--verify-certs', dest='verify_certs', action='store_true', help='Verify SSL certificates. Default is %(default)s.')
p.add_argument('--ca-certs', dest='ca_certs', default=None, type=str, help='Location of CA bundle.')
p.add_argument('--client-cert', dest='client_cert', default=None, type=str, help='Location of Client Auth cert.')
Expand All @@ -51,7 +55,10 @@ def main():
es.create_connection()
es.check_indexes()
es.search_query()
es.write_to_csv()
if (opts.json != True):
es.write_to_csv()
else:
es.write_to_json()
es.clean_scroll_ids()


Expand Down