Skip to content

Commit

Permalink
Extend the splunk adapter to send JSON converted records over a TCP c…
Browse files Browse the repository at this point in the history
…onnection (#127)
  • Loading branch information
pyrco authored Jun 20, 2024
1 parent 3056a39 commit 61bb734
Show file tree
Hide file tree
Showing 2 changed files with 319 additions and 261 deletions.
130 changes: 79 additions & 51 deletions flow/record/adapter/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,46 @@
[TAG]: optional value to add as "rdtag" output field when writing
[TOKEN]: Authentication token for sending data over HTTP(S)
[SOURCETYPE]: Set sourcetype of data. Defaults to records, but can also be set to JSON.
[SSL_VERIFY]: Whether to verify the server certificate when sending data over HTTP(S). Defaults to True.
[SSL_VERIFY]: Whether to verify the server certificate when sending data over HTTPS. Defaults to True.
"""

log = logging.getLogger(__package__)

# Amount of records to bundle into a single request when sending data over HTTP(S).
RECORD_BUFFER_LIMIT = 20

# https://docs.splunk.com/Documentation/Splunk/7.3.1/Data/Configureindex-timefieldextraction
RESERVED_SPLUNK_FIELDS = [
"_indextime",
"_time",
"index",
"punct",
"source",
"sourcetype",
"tag",
"type",
]
# List of reserved splunk fields that do not start with an `_`, as those will be escaped anyway.
# See: https://docs.splunk.com/Documentation/Splunk/9.2.1/Data/Aboutdefaultfields
RESERVED_SPLUNK_FIELDS = set(
[
"host",
"index",
"linecount",
"punct",
"source",
"sourcetype",
"splunk_server",
"timestamp",
],
)

RESERVED_SPLUNK_APP_FIELDS = set(
[
"tag",
"type",
]
)

RESERVED_RDUMP_FIELDS = set(
[
"rdtag",
"rdtype",
],
)

RESERVED_RECORD_FIELDS = ["_classification", "_generated", "_source"]
RESERVED_FIELDS = RESERVED_SPLUNK_FIELDS.union(RESERVED_SPLUNK_APP_FIELDS.union(RESERVED_RDUMP_FIELDS))

PREFIX_WITH_RD = set(RESERVED_SPLUNK_FIELDS + RESERVED_RECORD_FIELDS)
ESCAPE = "rd_"


class Protocol(Enum):
Expand All @@ -64,7 +81,13 @@ class SourceType(Enum):
RECORDS = "records"


def splunkify_key_value(record: Record, tag: Optional[str] = None) -> str:
def escape_field_name(field: str) -> str:
if field.startswith(("_", ESCAPE)) or field in RESERVED_FIELDS:
field = f"{ESCAPE}{field}"
return field


def record_to_splunk_kv_line(record: Record, tag: Optional[str] = None) -> str:
ret = []

ret.append(f'rdtype="{record._desc.name}"')
Expand All @@ -81,8 +104,7 @@ def splunkify_key_value(record: Record, tag: Optional[str] = None) -> str:

val = getattr(record, field)

if field in PREFIX_WITH_RD:
field = f"rd_{field}"
field = escape_field_name(field)

if val is None:
ret.append(f"{field}=None")
Expand All @@ -94,7 +116,25 @@ def splunkify_key_value(record: Record, tag: Optional[str] = None) -> str:
return " ".join(ret)


def splunkify_json(packer: JsonRecordPacker, record: Record, tag: Optional[str] = None) -> str:
def record_to_splunk_json(packer: JsonRecordPacker, record: Record, tag: Optional[str] = None) -> dict:
record_as_dict = packer.pack_obj(record)
json_dict = {}

for field, value in record_as_dict.items():
# Omit the _version field as the Splunk adapter has no reader support for deserialising records back.
if field == "_version":
continue
escaped_field = escape_field_name(field)
json_dict[escaped_field] = value

# Add rdump specific fields
json_dict["rdtag"] = tag
json_dict["rdtype"] = record._desc.name

return json_dict


def record_to_splunk_http_api_json(packer: JsonRecordPacker, record: Record, tag: Optional[str] = None) -> str:
ret = {}

indexer_fields = [
Expand All @@ -115,29 +155,13 @@ def splunkify_json(packer: JsonRecordPacker, record: Record, tag: Optional[str]
continue
ret[splunk_name] = to_str(val)

record_as_dict = packer.pack_obj(record)

# Omit the _version field as the Splunk adapter has no reader support for deserialising records back.
del record_as_dict["_version"]

# These fields end up in the 'event', but we have a few reserved field names. If those field names are in the
# record, we prefix them with 'rd_' (short for record descriptor)
for field in PREFIX_WITH_RD:
if field not in record_as_dict:
continue
new_field = f"rd_{field}"

record_as_dict[new_field] = record_as_dict[field]
del record_as_dict[field]

# Almost done, just have to add the tag and the type (i.e the record descriptor's name) to the event.
record_as_dict["rdtag"] = tag
ret["event"] = record_to_splunk_json(packer, record, tag)
return json.dumps(ret, default=packer.pack_obj)

# Yes.
record_as_dict["rdtype"] = record._desc.name

ret["event"] = record_as_dict
return json.dumps(ret, default=packer.pack_obj)
def record_to_splunk_tcp_api_json(packer: JsonRecordPacker, record: Record, tag: Optional[str] = None) -> str:
record_dict = record_to_splunk_json(packer, record, tag)
return json.dumps(record_dict, default=packer.pack_obj)


class SplunkWriter(AbstractWriter):
Expand All @@ -159,31 +183,31 @@ def __init__(

if sourcetype is None:
log.warning("No sourcetype provided, assuming 'records' sourcetype")
sourcetype = SourceType.RECORDS
self.sourcetype = SourceType.RECORDS
else:
self.sourcetype = SourceType(sourcetype)

parsed_url = urlparse(uri)
url_scheme = parsed_url.scheme.lower()

self.sourcetype = SourceType(sourcetype)
self.protocol = Protocol(url_scheme)

if self.protocol == Protocol.TCP and self.sourcetype != SourceType.RECORDS:
raise ValueError("For sending data to Splunk over TCP, only the 'records' sourcetype is allowed")

self.host = parsed_url.hostname
self.port = parsed_url.port

self.tag = tag
self.record_buffer = []
self._warned = False
self.packer = None

if self.sourcetype == SourceType.JSON:
self.packer = JsonRecordPacker(indent=4, pack_descriptors=False)
self.json_converter = None

if self.protocol == Protocol.TCP:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.SOL_TCP)
self.sock.connect((self.host, self.port))
self._send = self._send_tcp

if self.sourcetype == SourceType.JSON:
self.packer = JsonRecordPacker(indent=None, pack_descriptors=False)
self.json_converter = record_to_splunk_tcp_api_json

elif self.protocol in (Protocol.HTTP, Protocol.HTTPS):
if not HAS_HTTPX:
raise ImportError("The httpx library is required for sending data over HTTP(S)")
Expand Down Expand Up @@ -214,6 +238,10 @@ def __init__(

self._send = self._send_http

if self.sourcetype == SourceType.JSON:
self.packer = JsonRecordPacker(indent=4, pack_descriptors=False)
self.json_converter = record_to_splunk_http_api_json

def _cache_records_for_http(self, data: Optional[bytes] = None, flush: bool = False) -> Optional[bytes]:
# It's possible to call this function without any data, purely to flush. Hence this check.
if data:
Expand Down Expand Up @@ -252,9 +280,9 @@ def write(self, record: Record) -> None:
)

if self.sourcetype == SourceType.RECORDS:
rec = splunkify_key_value(record, self.tag)
rec = record_to_splunk_kv_line(record, self.tag)
else:
rec = splunkify_json(self.packer, record, self.tag)
rec = self.json_converter(self.packer, record, self.tag)

# Trail with a newline for line breaking.
data = to_bytes(rec) + b"\n"
Expand Down
Loading

0 comments on commit 61bb734

Please sign in to comment.