Skip to content
This repository has been archived by the owner on Jun 17, 2023. It is now read-only.

Commit

Permalink
Fix/es (#311)
Browse files Browse the repository at this point in the history
* adding pidfile support for monit deployment

* http content gzip

* ..

* bugfixes

* tweaking reqs

* tweaking compression for httpd

* fixing some store queues

* bugfixes

* cleaning up health checks

* cleaning up health checks

* adding gevent and gunicorn deps

* bugfix to filter

* requirements update

* fixing vagrant for es tests

* fixing elasticsearch tests

* adding flag to vagrantfile
  • Loading branch information
wesyoung authored May 29, 2017
1 parent 6a5a79c commit 6ca2979
Show file tree
Hide file tree
Showing 16 changed files with 211 additions and 62 deletions.
4 changes: 3 additions & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ VAGRANTFILE_LOCAL = 'Vagrantfile.local'
sdist=ENV['CIF_ANSIBLE_SDIST']
es=ENV['CIF_ANSIBLE_ES']
hunter_threads=ENV['CIF_HUNTER_THREADS']
hunter_advanced=ENV['CIF_HUNTER_ADVANCED']
geo_fqdn=ENV['CIF_GATHERER_GEO_FQDN']
csirtg_token=ENV['CSIRTG_TOKEN']

Expand All @@ -23,6 +24,7 @@ $script = <<SCRIPT
export CIF_ANSIBLE_SDIST=#{sdist}
export CIF_ANSIBLE_ES=#{es}
export CIF_HUNTER_THREADS=#{hunter_threads}
export CIF_HUNTER_ADVANCED=#{hunter_advanced}
export CIF_GATHERER_GEO_FQDN=#{geo_fqdn}
export CIF_BOOTSTRAP_TEST=1
export CSIRTG_TOKEN=#{csirtg_token}
Expand All @@ -40,7 +42,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.network :forwarded_port, guest: 443, host: 8443

config.vm.provider :virtualbox do |vb|
vb.customize ["modifyvm", :id, "--cpus", "2", "--ioapic", "on", "--memory", "2048" ]
vb.customize ["modifyvm", :id, "--cpus", "2", "--ioapic", "on", "--memory", "4096" ]
end

if File.file?(VAGRANTFILE_LOCAL)
Expand Down
2 changes: 1 addition & 1 deletion cif/gatherer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
LINGER = 0

logger = logging.getLogger(__name__)
TRACE = os.environ.get('CIF_ROUTER_TRACE') or os.environ.get('CIF_GATHERER_TRACE')
TRACE = os.environ.get('CIF_GATHERER_TRACE')

logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
Expand Down
2 changes: 1 addition & 1 deletion cif/gatherer/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _resolve(self, indicator):

i = indicator.indicator
if indicator.itype in ['url', 'fqdn']:
if not ENABLE_FQDN:
if ENABLE_FQDN in ['0', 0, False, None]:
return

if indicator.itype == 'url':
Expand Down
59 changes: 50 additions & 9 deletions cif/httpd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import logging
import os
import gc
import traceback
import textwrap
from argparse import ArgumentParser
from argparse import RawDescriptionHelpFormatter
Expand All @@ -11,12 +13,12 @@
from flask_limiter.util import get_remote_address
from flask_bootstrap import Bootstrap
from os import path

from cif.constants import ROUTER_ADDR
from cif.constants import ROUTER_ADDR, RUNTIME_PATH
from cifsdk.utils import get_argument_parser, setup_logging, setup_signals, setup_runtime_path
from .common import pull_token
from .views.ping import PingAPI
from .views.help import HelpAPI
from .views.health import HealthAPI
from .views.tokens import TokensAPI
from .views.indicators import IndicatorsAPI
from .views.feed import FeedAPI
Expand All @@ -32,15 +34,18 @@
HTTP_LISTEN_PORT = 5000
HTTP_LISTEN_PORT = os.environ.get('CIF_HTTPD_LISTEN_PORT', HTTP_LISTEN_PORT)

LIMIT_DAY = os.environ.get('CIF_HTTPD_LIMIT_DAY', 250000)
LIMIT_HOUR = os.environ.get('CIF_HTTPD_LIMIT_HOUR', 100000)
LIMIT_MIN = os.getenv('CIF_HTTPD_LIMIT_MINUTE', 120)

PIDFILE = os.getenv('CIF_HTTPD_PIDFILE', '{}/cif_httpd.pid'.format(RUNTIME_PATH))

SECRET_KEY = os.getenv('CIF_HTTPD_SECRET_KEY', os.urandom(24))
HTTPD_TOKEN = os.getenv('CIF_HTTPD_TOKEN')

HTTPD_UI_HOSTS = os.getenv('CIF_HTTPD_UI_HOSTS', '127.0.0.1')
HTTPD_UI_HOSTS = HTTPD_UI_HOSTS.split(',')

HTTPD_PROXY = os.getenv('CIF_HTTPD_PROXY')

extra_dirs = ['cif/httpd/templates', ]
extra_files = extra_dirs[:]
for extra_dir in extra_dirs:
Expand All @@ -50,17 +55,23 @@
if path.isfile(filename):
extra_files.append(filename)


def proxy_get_remote_address():
if HTTPD_PROXY in ['1', 1]:
return request.access_route[-1]

return get_remote_address()

app = Flask(__name__)
app.secret_key = SECRET_KEY
Bootstrap(app)
remote = ROUTER_ADDR
logger = logging.getLogger(__name__)
limiter = Limiter(
app,
key_func=get_remote_address,
key_func=proxy_get_remote_address,
global_limits=[
'{} per day'.format(LIMIT_DAY),
'{} per hour'.format(LIMIT_HOUR)
'{} per minute'.format(LIMIT_MIN)
]
)

Expand All @@ -75,13 +86,20 @@

app.add_url_rule('/', view_func=HelpAPI.as_view('/'))
app.add_url_rule('/help', view_func=HelpAPI.as_view('help'))
app.add_url_rule('/health', view_func=HealthAPI.as_view('health'))
app.add_url_rule('/ping', view_func=PingAPI.as_view('ping'))
app.add_url_rule('/tokens', view_func=TokensAPI.as_view('tokens'))
app.add_url_rule('/indicators', view_func=IndicatorsAPI.as_view('indicators'))
app.add_url_rule('/search', view_func=IndicatorsAPI.as_view('search'))
app.add_url_rule('/feed', view_func=FeedAPI.as_view('feed'))
app.add_url_rule('/help/confidence', view_func=ConfidenceAPI.as_view('confidence'))


@app.teardown_request
def teardown_request(exception):
gc.collect()


@app.before_request
def before_request():
"""
Expand Down Expand Up @@ -113,7 +131,7 @@ def before_request():
else:
return

if request.endpoint not in ['/', 'help', 'confidence']:
if request.endpoint not in ['/', 'help', 'confidence', 'health']:

t = pull_token()
if not t or t == 'None':
Expand Down Expand Up @@ -169,6 +187,7 @@ def main():
p.add_argument('--listen', help='specify the interface to listen on [default %(default)s]', default=HTTP_LISTEN)
p.add_argument('--listen-port', help='specify the port to listen on [default %(default)s]',
default=HTTP_LISTEN_PORT)
p.add_argument('--pidfile', help='specify pidfile location [default: %(default)s]', default=PIDFILE)

p.add_argument('--fdebug', action='store_true')

Expand All @@ -181,6 +200,22 @@ def main():

setup_runtime_path(args.runtime_path)

if not args.fdebug:
# http://stackoverflow.com/a/789383/7205341
pid = str(os.getpid())
logger.debug("pid: %s" % pid)

if os.path.isfile(args.pidfile):
logger.critical("%s already exists, exiting" % args.pidfile)
raise SystemExit

try:
pidfile = open(args.pidfile, 'w')
pidfile.write(pid)
pidfile.close()
except PermissionError as e:
logger.error('unable to create pid %s' % args.pidfile)

try:
logger.info('pinging router...')
#app.config["SECRET_KEY"] = SECRET_KEY
Expand All @@ -189,7 +224,13 @@ def main():

except KeyboardInterrupt:
logger.info('shutting down...')
raise SystemExit

except Exception as e:
logger.critical(e)
traceback.print_exc()

if os.path.isfile(args.pidfile):
os.unlink(args.pidfile)

if __name__ == "__main__":
main()
45 changes: 45 additions & 0 deletions cif/httpd/views/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from flask import current_app
from cifsdk.client.zeromq import ZMQ as Client
from cif.constants import ROUTER_ADDR
from cifsdk.exceptions import TimeoutError, AuthError
from ..common import jsonify_unauth, jsonify_unknown, jsonify_success
from flask.views import MethodView
import os
import gc
from flask import request
from flask_limiter.util import get_remote_address

HTTPD_TOKEN = os.getenv('CIF_HTTPD_TOKEN', False)


class HealthAPI(MethodView):
def get(self):

# feeds get large need to force some cleanup
gc.collect()

if get_remote_address() != request.access_route[-1]:
return jsonify_unauth()

if not HTTPD_TOKEN:
return jsonify_success()

remote = ROUTER_ADDR
if current_app.config.get('CIF_ROUTER_ADDR'):
remote = current_app.config['CIF_ROUTER_ADDR']

try:
r = Client(remote, HTTPD_TOKEN).ping()
r = Client(remote, HTTPD_TOKEN).indicators_search({'indicator': 'example.com', 'nolog': '1'})
r = True

except TimeoutError:
return jsonify_unknown(msg='timeout', code=408)

except AuthError:
return jsonify_unauth()

if not r:
return jsonify_unknown(503)

return jsonify_success()
10 changes: 9 additions & 1 deletion cif/httpd/views/indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from cif.constants import ROUTER_ADDR, PYVERSION
from cifsdk.exceptions import AuthError, TimeoutError, InvalidSearch, SubmissionFailed, CIFBusy
import logging
import zlib

remote = ROUTER_ADDR

logger = logging.getLogger('cif-httpd')
Expand Down Expand Up @@ -79,7 +81,13 @@ def post(self):
logger.info('fireball mode')
fireball = True
try:
data = request.data.decode('utf-8')

data = request.data
if request.headers.get('Content-Encoding') and request.headers['Content-Encoding'] == 'deflate':
data = zlib.decompress(data)

data = data.decode('utf-8')

r = Client(remote, pull_token()).indicators_create(data, nowait=nowait, fireball=fireball)
if nowait:
r = 'pending'
Expand Down
2 changes: 1 addition & 1 deletion cif/hunter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
EXCLUDE = os.environ.get('CIF_HUNTER_EXCLUDE', None)
HUNTER_ADVANCED = os.getenv('CIF_HUNTER_ADVANCED', 0)

TRACE = os.environ.get('CIF_ROUTER_TRACE') or os.environ.get('CIF_HUNTER_TRACE')
TRACE = os.environ.get('CIF_HUNTER_TRACE')

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Expand Down
29 changes: 28 additions & 1 deletion cif/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from time import sleep
import zmq
import os
from cif.constants import ROUTER_ADDR, STORE_ADDR, HUNTER_ADDR, GATHERER_ADDR, GATHERER_SINK_ADDR, HUNTER_SINK_ADDR
import sys
from cif.constants import ROUTER_ADDR, STORE_ADDR, HUNTER_ADDR, GATHERER_ADDR, GATHERER_SINK_ADDR, HUNTER_SINK_ADDR, RUNTIME_PATH
from cifsdk.constants import CONFIG_PATH
from cifsdk.utils import setup_logging, get_argument_parser, setup_signals, setup_runtime_path, read_config
from cif.hunter import Hunter
Expand All @@ -19,6 +20,7 @@
import multiprocessing as mp
from cifsdk.msg import Msg


HUNTER_MIN_CONFIDENCE = 4
HUNTER_THREADS = os.getenv('CIF_HUNTER_THREADS', 0)
HUNTER_ADVANCED = os.getenv('CIF_HUNTER_ADVANCED', 0)
Expand All @@ -42,6 +44,9 @@
STORE_NODES = os.getenv('CIF_STORE_NODES')


PIDFILE = os.getenv('CIF_ROUTER_PIDFILE', '{}/cif_router.pid'.format(RUNTIME_PATH))


class Router(object):

def __enter__(self):
Expand Down Expand Up @@ -263,6 +268,8 @@ def main():

p.add_argument('--logging-ignore', help='set logging to WARNING for specific modules')

p.add_argument('--pidfile', help='specify pidfile location [default: %(default)s]', default=PIDFILE)

args = p.parse_args()
setup_logging(args)
logger = logging.getLogger(__name__)
Expand All @@ -283,24 +290,44 @@ def main():
setup_runtime_path(args.runtime_path)
setup_signals(__name__)

# http://stackoverflow.com/a/789383/7205341
pid = str(os.getpid())
logger.debug("pid: %s" % pid)

if os.path.isfile(args.pidfile):
logger.critical("%s already exists, exiting" % args.pidfile)
raise SystemExit

try:
pidfile = open(args.pidfile, 'w')
pidfile.write(pid)
pidfile.close()
except PermissionError as e:
logger.error('unable to create pid %s' % args.pidfile)

with Router(listen=args.listen, hunter=args.hunter, store_type=args.store, store_address=args.store_address,
store_nodes=args.store_nodes, hunter_token=args.hunter_token, hunter_threads=args.hunter_threads,
gatherer_threads=args.gatherer_threads) as r:
try:
logger.info('starting router..')
r.start()

except KeyboardInterrupt:
# todo - signal to threads to shut down and wait for them to finish
logger.info('shutting down via SIGINT...')

except SystemExit:
logger.info('shutting down via SystemExit...')

except Exception as e:
logger.critical(e)
traceback.print_exc()

r.stop()

logger.info('Shutting down')
if os.path.isfile(args.pidfile):
os.unlink(args.pidfile)

if __name__ == "__main__":
main()
Loading

0 comments on commit 6ca2979

Please sign in to comment.