-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathlogstash.conf
101 lines (95 loc) · 4.21 KB
/
logstash.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
input {
file {
path => ["/var/log/network.log"]
start_position => "beginning"
type => "syslog"
tags => [ "netsyslog" ]
}
} #end input block
filter {
if [type] == "syslog" {
grok {
#strips timestamp and host off of the front of the syslog message leaving the raw message generated by the syslog client and saves it as "raw_message"
#patterns_dir => "/opt/logstash/patterns"
match => [ "message", "%{TIMESTAMP_ISO8601:@timestamp} %{HOST:syslog_host} %{GREEDYDATA:raw_message}" ]
}
}
csv {
source => "raw_message"
columns => [ "PaloAltoDomain","ReceiveTime","SerialNum","Type","Threat-ContentType","ConfigVersion","GenerateTime","SourceAddress","DestinationAddress","NATSourceIP","NATDestinationIP","Rule","SourceUser","DestinationUser","Application","VirtualSystem","SourceZone","DestinationZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SourcePort","DestinationPort","NATSourcePort","NATDestinationPort","Flags","IPProtocol","Action","Bytes","BytesSent","BytesReceived","Packets","StartTime","ElapsedTimeInSec","Category","Padding","seqno","actionflags","SourceCountry","DestinationCountry","cpadding","pkts_sent","pkts_received" ]
}
date {
timezone => "America/New_York"
match => [ "GenerateTime", "YYYY/MM/dd HH:mm:ss" ]
}
#convert fields to proper format
mutate {
convert => [ "Bytes", "integer" ]
convert => [ "BytesReceived", "integer" ]
convert => [ "BytesSent", "integer" ]
convert => [ "ElapsedTimeInSec", "integer" ]
convert => [ "geoip.area_code", "integer" ]
convert => [ "geoip.dma_code", "integer" ]
convert => [ "geoip.latitude", "float" ]
convert => [ "geoip.longitude", "float" ]
convert => [ "NATDestinationPort", "integer" ]
convert => [ "NATSourcePort", "integer" ]
convert => [ "Packets", "integer" ]
convert => [ "pkts_received", "integer" ]
convert => [ "pkts_sent", "integer" ]
convert => [ "seqno", "integer" ]
gsub => [ "Rule", " ", "_",
"Application", "( |-)", "_" ]
remove_field => [ "message", "raw_message" ]
}
#Geolocate logs that have SourceAddress and if that SourceAddress is a non-RFC1918 address
if [SourceAddress] and [SourceAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
database => "/opt/logstash/GeoLiteCity.dat"
source => "SourceAddress"
target => "SourceGeo"
}
#Delete 0,0 in SourceGeo.location if equal to 0,0
if ([SourceGeo.location] and [SourceGeo.location] =~ "0,0") {
mutate {
replace => [ "SourceGeo.location", "" ]
}
}
}
#Geolocate logs that have DestinationAddress and if that DestinationAddress is a non-RFC1918 address
if [DestinationAddress] and [DestinationAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
database => "/opt/logstash/GeoLiteCity.dat"
source => "DestinationAddress"
target => "DestinationGeo"
}
#Delete 0,0 in DestinationGeo.location if equal to 0,0
if ([DestinationGeo.location] and [DestinationGeo.location] =~ "0,0") {
mutate {
replace => [ "DestinationAddress.location", "" ]
}
}
}
#Takes the 5-tuple of source address, source port, destination address, destination port, and protocol and does a SHA1 hash to fingerprint the flow. This is a useful
#way to be able to do top N terms queries on flows, not just on one field.
if [SourceAddress] and [DestinationAddress] {
fingerprint {
concatenate_sources => true
method => "SHA1"
key => "logstash"
source => [ "SourceAddress", "SourcePort", "DestinationAddress", "DestinationPort", "IPProtocol" ]
}
}
} #end filter block
output {
#stdout { codec => rubydebug }
#stdout { debug => true }
elasticsearch {
protocol => "node"
node_name => "logstash"
cluster => "test-elastic"
host => "127.0.0.1"
template => "/opt/logstash/elasticsearch-template.json"
template_overwrite => true
}
} #end output block