Skip to content

Commit

Permalink
master update (#2)
Browse files Browse the repository at this point in the history
* Update fluentd to v1.3 and add retag plugin

* Fix cleanup path for cached ruby gems

* update to 1.6 and add snappy

* Update Dockerfile

* make sure snappy stays

* add lz4 as well

* Add fluent-plugin-gelf-hs

* Added TTL hashmap expand plugin

* Add distance field

* Check type key existence

* Right field name

* Final field name

* fluent 1.12

Co-authored-by: Jaroslav Barton <[email protected]>
Co-authored-by: Tomas Zvala <[email protected]>
Co-authored-by: root <[email protected]>
Co-authored-by: Jaroslav Barton <[email protected]>
  • Loading branch information
5 people authored Sep 22, 2021
1 parent ce8138d commit 4cbc6ef
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 6 deletions.
18 changes: 12 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
FROM fluent/fluentd:latest

FROM fluent/fluentd:v1.12
USER root
# below RUN includes plugin as examples elasticsearch is not required
# you may customize including plugins as you wish

RUN apk add --update krb5-libs && \
RUN apk add --update krb5-libs snappy && \
apk add --update --virtual .build-deps libffi-dev \
sudo build-base ruby-dev && \
sudo build-base ruby-dev snappy-dev build-base libexecinfo automake autoconf libtool && \
sudo gem install fluent-plugin-kafka \
fluent-plugin-influxdb \
fluent-plugin-rewrite-tag-filter \
fluent-plugin-record-modifier \
fluent-plugin-juniper-telemetry \
fluent-plugin-snmp \
fluent-plugin-elasticsearch \
fluent-plugin-retag \
bigdecimal \
zookeeper && \
zookeeper \
snappy \
extlz4 \
fluent-plugin-gelf-hs && \
sudo gem sources --clear-all && \
apk del .build-deps && \
rm -rf /var/cache/apk/* \
/home/fluent/.gem/ruby/2.3.0/cache/*.gem
/usr/lib/ruby/gems/2.5.0/cache

ADD plugins /fluentd/plugins
65 changes: 65 additions & 0 deletions plugins/filter_expand_ttl.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
require 'fluent/plugin/filter'

module Fluent::Plugin
class ExpandTTLFilter < Fluent::Plugin::Filter
# Register this filter as "passthru"
Fluent::Plugin.register_filter('expand_ttl', self)

# config_param works like other plugins
desc "TTL map key"
config_param :ttl_map_key, :string, default: 'inTTL'

desc "TTL key"
config_param :ttl_key, :string, default: 'ttl'

desc "TTL distance"
config_param :ttl_value, :string, default: 'distance'

desc "Value field"
config_param :value_field, :string, default: 'count'

def configure(conf)
super
# do the usual configuration here
end

def ttl_distance(ttl)
if ttl > 128
return 255 - ttl
elsif ttl > 64
return 128 - ttl
else
return 64 - ttl
end
end

def expand(tag, time, record, es)
if record["type"].end_with?("TTL_SUM")
if record.key?(@ttl_map_key)
record[@ttl_map_key].each { |key, value|
new_record = record.clone
new_record[@ttl_key] = key
new_record[@ttl_value] = ttl_distance(key.to_i)
new_record[@value_field] = value
new_record.delete(@ttl_map_key)
es.add(time, new_record)
}
end
else
es.add(time, record)
end
end

def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each { |time, record|
begin
expand(tag, time, record, new_es)
rescue => e
router.emit_error_event(tag, time, record, e)
end
}
new_es
end
end
end

0 comments on commit 4cbc6ef

Please sign in to comment.