From 10aaa5b185116bc3cf84bafe2ea85388926b47e3 Mon Sep 17 00:00:00 2001 From: Ralph Verburg Date: Mon, 13 Jan 2025 11:34:10 +0100 Subject: [PATCH] Add queue size to Elastic adapter (#156) Add queue size limit to prevent high memory usage for plugins with multiple million of records --------- Co-authored-by: Yun Zheng Hu --- flow/record/adapter/elastic.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index 035199d..6f0b231 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -25,6 +25,7 @@ Optional arguments: [API_KEY]: base64 encoded api key to authenticate with (default: False) + [QUEUE_SIZE]: maximum queue size for writing records; limits memory usage (default: 100000) [INDEX]: name of the index to use (default: records) [VERIFY_CERTS]: verify certs of Elasticsearch instance (default: True) [HASH_RECORD]: make record unique by hashing record [slow] (default: False) @@ -43,6 +44,7 @@ def __init__( http_compress: str | bool = True, hash_record: str | bool = False, api_key: str | None = None, + queue_size: int = 100000, **kwargs, ) -> None: self.index = index @@ -50,11 +52,12 @@ def __init__( verify_certs = str(verify_certs).lower() in ("1", "true") http_compress = str(http_compress).lower() in ("1", "true") self.hash_record = str(hash_record).lower() in ("1", "true") + queue_size = int(queue_size) if not uri.lower().startswith(("http://", "https://")): uri = "http://" + uri - self.queue: queue.Queue[Record | StopIteration] = queue.Queue() + self.queue: queue.Queue[Record | StopIteration] = queue.Queue(maxsize=queue_size) self.event = threading.Event() self.es = elasticsearch.Elasticsearch( @@ -147,7 +150,7 @@ def streaming_bulk_thread(self) -> None: self.event.set() def write(self, record: Record) -> None: - self.queue.put_nowait(record) + self.queue.put(record) def flush(self) -> None: pass