Skip to content

Commit

Permalink
Add queue size to Elastic adapter (#156)
Browse files Browse the repository at this point in the history
Add queue size limit to prevent high memory usage for plugins with multiple million of records

---------

Co-authored-by: Yun Zheng Hu <[email protected]>
  • Loading branch information
raverburg and yunzheng authored Jan 13, 2025
1 parent 25d783a commit 10aaa5b
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions flow/record/adapter/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Optional arguments:
[API_KEY]: base64 encoded api key to authenticate with (default: False)
[QUEUE_SIZE]: maximum queue size for writing records; limits memory usage (default: 100000)
[INDEX]: name of the index to use (default: records)
[VERIFY_CERTS]: verify certs of Elasticsearch instance (default: True)
[HASH_RECORD]: make record unique by hashing record [slow] (default: False)
Expand All @@ -43,18 +44,20 @@ def __init__(
http_compress: str | bool = True,
hash_record: str | bool = False,
api_key: str | None = None,
queue_size: int = 100000,
**kwargs,
) -> None:
self.index = index
self.uri = uri
verify_certs = str(verify_certs).lower() in ("1", "true")
http_compress = str(http_compress).lower() in ("1", "true")
self.hash_record = str(hash_record).lower() in ("1", "true")
queue_size = int(queue_size)

if not uri.lower().startswith(("http://", "https://")):
uri = "http://" + uri

self.queue: queue.Queue[Record | StopIteration] = queue.Queue()
self.queue: queue.Queue[Record | StopIteration] = queue.Queue(maxsize=queue_size)
self.event = threading.Event()

self.es = elasticsearch.Elasticsearch(
Expand Down Expand Up @@ -147,7 +150,7 @@ def streaming_bulk_thread(self) -> None:
self.event.set()

def write(self, record: Record) -> None:
self.queue.put_nowait(record)
self.queue.put(record)

def flush(self) -> None:
pass
Expand Down

0 comments on commit 10aaa5b

Please sign in to comment.